diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 015baa1..2ccd374 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1665,6 +1665,26 @@ private static void addDeprecatedKeys() { NODE_LABELS_PREFIX + "fs-store.retry-policy-spec"; public static final String DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC = "2000, 500"; + + /** + * Application Priority configurations + */ + public static final String APPLICATION_PRIORITY_PREFIX = YARN_PREFIX + + "application-priority."; + + public static final String YARN_CLUSTER_APP_PRIORITY = APPLICATION_PRIORITY_PREFIX + + "cluster"; + + public static final String RM_APP_PRIORITY_MANAGER_CLASS = APPLICATION_PRIORITY_PREFIX + + "manager-class"; + + /** URI for ApplicationPriorityManager */ + public static final String FS_APP_PRIORITY_STORE_ROOT_DIR = APPLICATION_PRIORITY_PREFIX + + "fs-store.root-dir"; + public static final String FS_APP_PRIORITY_STORE_RETRY_POLICY_SPEC = + APPLICATION_PRIORITY_PREFIX + "fs-store.retry-policy-spec"; + public static final String DEFAULT_FS_APP_PRIORITY_STORE_RETRY_POLICY_SPEC = + "2000, 500"; /** * Flag to indicate if the node labels feature enabled, by default it's diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/applicationpriority/ApplicationPriorityStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/applicationpriority/ApplicationPriorityStore.java new file mode 100644 index 0000000..03aa12a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/applicationpriority/ApplicationPriorityStore.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applicationpriority; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationPriorityPerQueue; + +public abstract class ApplicationPriorityStore implements Closeable { + protected final CommonApplicationPriorityManager appPrManager; + protected Configuration conf; + + public ApplicationPriorityStore(CommonApplicationPriorityManager appPrManager) { + this.appPrManager = appPrManager; + } + + /** + * Set priority per queue + */ + public abstract void setApplicationPriorityForQueue(String queueName, + ApplicationPriorityPerQueue applicationPriorityPerQueue) + throws IOException; + + /** + * Store new application priority in cluster + */ + public abstract void addToClusterApplicationPriorities( + Set applicationPriority) throws IOException; + + /** + * Remove application priority from cluster + */ + public abstract void removeFromClusterApplicationPriorities( + Collection applicationPriority) throws IOException; + + /** + * Recover cluster priority and queue to priority mappings from store + * + * @param conf + */ + public abstract void recover() throws IOException; + + public void init(Configuration conf) throws Exception { + this.conf = conf; + } + + public CommonApplicationPriorityManager getApplicationPriorityManager() { + return appPrManager; + } +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/applicationpriority/CommonApplicationPriorityManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/applicationpriority/CommonApplicationPriorityManager.java new file mode 100644 index 0000000..5e8f479 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/applicationpriority/CommonApplicationPriorityManager.java @@ -0,0 +1,526 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applicationpriority; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationPriorityPerQueue; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.applicationpriority.event.AddToClusterApplicationPriority; +import org.apache.hadoop.yarn.applicationpriority.event.ApplicationPriorityStoreEvent; +import org.apache.hadoop.yarn.applicationpriority.event.ApplicationPriorityStoreEventType; +import org.apache.hadoop.yarn.applicationpriority.event.RemoveFromClusterApplicationPriority; +import org.apache.hadoop.yarn.applicationpriority.event.SetApplicationPriorityPerQueue; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +public class CommonApplicationPriorityManager extends AbstractService { + + protected static final Log LOG = LogFactory + .getLog(CommonApplicationPriorityManager.class); + + public static final String EMPTY_QUEUE_NAME = ""; + + protected Dispatcher dispatcher; + + protected Set clusterPriorityCollections = new HashSet(); + + // Map to queueName vs Queue object which has priority labels + protected ConcurrentMap queueCollections = new ConcurrentHashMap(); + + protected final ReadLock readLock; + protected final WriteLock writeLock; + + protected ApplicationPriorityStore store; + private static Integer MAX_PRIORITY_VALUE = 65535; + private static Integer MIN_PRIORITY_VALUE = -65535; + + protected static class Queue { + public String queueName; + public ApplicationPriorityPerQueue applicationPriorityPerQueue; + + protected Queue() { + this.queueName = EMPTY_QUEUE_NAME; + this.applicationPriorityPerQueue = null; + } + + public Queue(String queueName, + ApplicationPriorityPerQueue applicationPriorityPerQueue) { + this.queueName = queueName; + this.applicationPriorityPerQueue = applicationPriorityPerQueue; + } + } + + public CommonApplicationPriorityManager() { + super(CommonApplicationPriorityManager.class.getName()); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + } + + private final class ForwardingEventHandler + implements + EventHandler { + + @Override + public void handle(ApplicationPriorityStoreEvent event) { + if (isInState(STATE.STARTED)) { + handleStoreEvent(event); + } + } + } + + // Dispatcher related code + protected void handleStoreEvent(ApplicationPriorityStoreEvent event) { + try { + switch (event.getType()) { + case SET_APPLICATION_PRIORITY_PER_QUEUE : + SetApplicationPriorityPerQueue setPriorityLabelPerQueueEvent = (SetApplicationPriorityPerQueue) event; + store.setApplicationPriorityForQueue( + setPriorityLabelPerQueueEvent.getQueueName(), + setPriorityLabelPerQueueEvent.getApplicationPriorityPerQueue()); + break; + case ADD_TO_CLUSTER_APPLICATION_PRIORITIES : + AddToClusterApplicationPriority addToClusterPriorityLabelsEvent = (AddToClusterApplicationPriority) event; + store + .addToClusterApplicationPriorities(addToClusterPriorityLabelsEvent + .getApplicationPriorities()); + break; + case REMOVE_FROM_CLUSTER_APPLICATION_PRIORITIES : + RemoveFromClusterApplicationPriority removeFromClusterPriorityLabelsEvent = (RemoveFromClusterApplicationPriority) event; + store + .removeFromClusterApplicationPriorities(removeFromClusterPriorityLabelsEvent + .getApplicationPriorities()); + break; + default : + break; + } + } catch (IOException e) { + LOG.error("Failed to store application priority modification to storage"); + throw new YarnRuntimeException(e); + } + } + + // UT purpose + protected void initDispatcher(Configuration conf) { + dispatcher = new AsyncDispatcher(); + AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher; + asyncDispatcher.init(conf); + asyncDispatcher.setDrainEventsOnStop(); + } + + // for UT purpose + protected void startDispatcher() { + // start dispatcher + AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher; + asyncDispatcher.start(); + } + + // for UT purpose + protected void stopDispatcher() { + AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher; + asyncDispatcher.stop(); + } + + protected void initApplicationPriorityStore(Configuration conf) + throws Exception { + this.store = new FileSystemApplicationPriorityStore(this); + this.store.init(conf); + this.store.recover(); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + initApplicationPriorityStore(conf); + } + + @Override + protected void serviceStart() throws Exception { + initDispatcher(getConfig()); + + if (null != dispatcher) { + dispatcher.register(ApplicationPriorityStoreEventType.class, + new ForwardingEventHandler()); + } + + startDispatcher(); + } + + @Override + protected void serviceStop() throws Exception { + // finalize store + stopDispatcher(); + + // only close store when we enabled store persistent + if (null != store) { + store.close(); + } + } + + /** + * Add cluster level priority to store + * + * @param priorityToAdd + * new application priority + */ + @SuppressWarnings("unchecked") + public void addToClusterApplicationPriorities(Set priorityToAdd) + throws IOException { + if (null == priorityToAdd || priorityToAdd.isEmpty()) { + return; + } + + try { + writeLock.lock(); + Set priorityList = normalizeApplicationPriorities(priorityToAdd); + + for (Priority priority : priorityList) { + this.clusterPriorityCollections.add(priority); + } + + if (null != dispatcher) { + dispatcher.getEventHandler().handle( + new AddToClusterApplicationPriority(priorityToAdd)); + } + + LOG.info("Add to cluster priority: [" + + StringUtils.join(priorityToAdd.iterator(), ",") + "]"); + } finally { + writeLock.unlock(); + } + + } + + protected void checkRemoveFromClusterPriority(Collection prToRemove) + throws IOException { + if (null == prToRemove || prToRemove.isEmpty()) { + return; + } + + // Check if priority to remove doesn't exist or null/empty, will throw + // exception + for (Integer priority : prToRemove) { + if (priority == null) { + throw new IOException("Priority to be removed is null"); + } + + if (!isPriorityExistsInCluster(priority)) { + throw new IOException("Priority =" + priority + + " to be removed doesn't exist in cluster " + + "priority collection."); + } + + // check if any queue contains this label + for (Entry entry : queueCollections.entrySet()) { + String queueName = entry.getKey(); + ApplicationPriorityPerQueue queueLabels = entry.getValue().applicationPriorityPerQueue; + if (queueLabels.getDefaultApplicationPriority().equals(priority) + || queueLabels.getMaxApplicationPriority().equals(priority)) { + throw new IOException("Cannot remove priority label=" + priority + + ", because queue=" + queueName + " is using this label. " + + "Please remove label on queue before remove the label"); + } + } + } + } + + @SuppressWarnings("unchecked") + protected void internalRemoveFromClusterPriority( + Collection prToRemove) { + + // remove labels from queue labels collection + for (Integer priority : prToRemove) { + clusterPriorityCollections.remove(Priority.newInstance(priority)); + } + + // create event to remove labels + if (null != dispatcher) { + dispatcher.getEventHandler().handle( + new RemoveFromClusterApplicationPriority(prToRemove)); + } + + LOG.info("Remove priority: [" + + StringUtils.join(prToRemove.iterator(), ",") + "]"); + } + + /** + * Remove from cluster priority list + * + * @param priorityToRemove + * app priority to remove + * @throws IOException + */ + public void removeFromClusterApplicationPriority( + Collection priorityToRemove) throws IOException { + try { + writeLock.lock(); + checkRemoveFromClusterPriority(priorityToRemove); + internalRemoveFromClusterPriority(priorityToRemove); + } finally { + writeLock.unlock(); + } + } + + protected void checkReplacePriorityOnQueue( + Map replacePriorityOnQueue) + throws IOException { + if (null == replacePriorityOnQueue || replacePriorityOnQueue.isEmpty()) { + return; + } + + // check all labels being added existed + for (Entry entry : replacePriorityOnQueue + .entrySet()) { + if (!clusterPriorityCollections.contains(Priority.newInstance(entry + .getValue().getMaxApplicationPriority())) + || !clusterPriorityCollections.contains(Priority.newInstance(entry + .getValue().getDefaultApplicationPriority()))) { + String msg = "Not all priorities being replaced contained by known " + + "priority collections, please check"; + LOG.error(msg); + throw new IOException(msg); + } + } + } + + @SuppressWarnings("unchecked") + protected void internalReplacePriorityOnQueue( + Map replacePriorityOnQueue) + throws IOException { + // do replace labels to queues + for (Entry entry : replacePriorityOnQueue + .entrySet()) { + String queueName = entry.getKey(); + ApplicationPriorityPerQueue applicationPriorityPerQueue = entry + .getValue(); + + Queue queue = queueCollections.get(queueName); + if (queue == null) { + queue = new Queue(queueName, applicationPriorityPerQueue); + this.queueCollections.put(queueName, queue); + } + + queue.applicationPriorityPerQueue = applicationPriorityPerQueue; + if (null != dispatcher) { + dispatcher.getEventHandler().handle( + new SetApplicationPriorityPerQueue(queue.queueName, + queue.applicationPriorityPerQueue)); + } + } + } + + /** + * replace priority to queue + * + * @param applicationPriorityOnQueue + * queue -> priority map + */ + public void replaceApplicationPriorityOnQueue( + Map applicationPriorityOnQueue) + throws IOException { + try { + writeLock.lock(); + checkReplacePriorityOnQueue(applicationPriorityOnQueue); + + internalReplacePriorityOnQueue(applicationPriorityOnQueue); + } finally { + writeLock.unlock(); + } + } + + /** + * Set multiple priorities to Queue + * + * @param queueName + * , applicationPriorityPerQueue max and default priority + */ + @SuppressWarnings("unchecked") + public void setApplicationPriorityPerQueue(String queueName, + ApplicationPriorityPerQueue applicationPriorityPerQueue) + throws IOException { + if (null == applicationPriorityPerQueue) { + return; + } + + // Validate whether configured priorities are present in Cluster too. + if (!validatePerQueueApplicationPriorityInCluster(applicationPriorityPerQueue)) { + throw new IOException("Priority label=" + applicationPriorityPerQueue + + " to be set to Queue = '" + queueName + + "', doesn't exist in cluster " + "priority labels collection."); + } + + try { + writeLock.lock(); + + Queue currentQueue = null; + if (this.queueCollections.containsKey(queueName)) { + currentQueue = this.queueCollections.get(queueName); + currentQueue.applicationPriorityPerQueue = applicationPriorityPerQueue; + } else { + currentQueue = new Queue(queueName, applicationPriorityPerQueue); + this.queueCollections.put(queueName, currentQueue); + } + + if (null != dispatcher) { + dispatcher.getEventHandler().handle( + new SetApplicationPriorityPerQueue(queueName, + currentQueue.applicationPriorityPerQueue)); + } + } finally { + writeLock.unlock(); + } + + LOG.info("Set priority labels to Queue: QueueName - " + queueName + + "Max Priority - " + + applicationPriorityPerQueue.getMaxApplicationPriority() + + ", Default Priority - " + + applicationPriorityPerQueue.getDefaultApplicationPriority()); + } + + /** + * Get existing valid labels in cluster + * + * @return existing valid labels in cluster + */ + public Set getClusterPriorities() { + try { + readLock.lock(); + return Collections + .unmodifiableSet(getPrioritiesToSave(clusterPriorityCollections)); + } finally { + readLock.unlock(); + } + } + + /** + * Get cluster priority from Queue + * + * @param queueName + * @return PriorityLabelsPerQueue + * @throws IOException + */ + public ApplicationPriorityPerQueue getApplicationPriorityFromQueue( + String queueName) throws IOException { + try { + readLock.lock(); + Queue currentQueue = null; + if (this.queueCollections.containsKey(queueName)) { + currentQueue = this.queueCollections.get(queueName); + } else { + throw new IOException("Priority labels for Queue: '" + queueName + + "' doesn't exist in cluster."); + } + return currentQueue.applicationPriorityPerQueue; + } finally { + readLock.unlock(); + } + } + + /** + * Get mapping of queue to priority + * + * @return queues to priority map + */ + public Map getApplicationPrioritiesPerQueue() { + try { + readLock.lock(); + Map queueToPriorities = new HashMap(); + for (Entry entry : queueCollections.entrySet()) { + String queueName = entry.getKey(); + Queue queue = entry.getValue(); + + queueToPriorities.put(queueName, queue.applicationPriorityPerQueue); + } + return Collections.unmodifiableMap(queueToPriorities); + } finally { + readLock.unlock(); + } + } + + private Set getPrioritiesToSave(Set priorityList) { + Set prioritySet = new HashSet(); + for (Priority priority : priorityList) { + prioritySet.add(priority.getPriority()); + } + return prioritySet; + } + + private Set normalizeApplicationPriorities( + Set applicationPriorities) { + Set priorityList = new HashSet(); + + for (Integer priority : applicationPriorities) { + if (priority < MIN_PRIORITY_VALUE || priority > MAX_PRIORITY_VALUE) { + continue; // skip invalid entries + } + priorityList.add(Priority.newInstance(priority)); + } + return priorityList; + } + + private boolean validatePerQueueApplicationPriorityInCluster( + ApplicationPriorityPerQueue applicationPriorityPerQueue) { + if (!isPriorityExistsInCluster(applicationPriorityPerQueue + .getDefaultApplicationPriority())) { + return false; + } + + if (!isPriorityExistsInCluster(applicationPriorityPerQueue + .getMaxApplicationPriority())) { + return false; + } + + return true; + } + + /** + * Verify whether priority exists in repository + * + * @return true or false + */ + public boolean isPriorityExistsInCluster(Integer priorityValue) { + try { + readLock.lock(); + Priority priority = Priority.newInstance(priorityValue); + return this.clusterPriorityCollections.contains(priority); + } finally { + readLock.unlock(); + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/applicationpriority/FileSystemApplicationPriorityStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/applicationpriority/FileSystemApplicationPriorityStore.java new file mode 100644 index 0000000..e8219c2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/applicationpriority/FileSystemApplicationPriorityStore.java @@ -0,0 +1,284 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applicationpriority; + +import java.io.EOFException; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationPriorityPerQueue; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationPriorityPerQueuePBImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationPriorityPerQueueProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterLevelApplicationPriorityRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveApplicationPriorityFromClusterRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceApplicationPriorityOnQueueRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SetApplicationPriorityForQueueRequestProto; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterLevelApplicationPriorityRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveApplicationPriorityFromClusterRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceApplicationPriorityOnQueueRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.SetApplicationPriorityForQueueRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterLevelApplicationPriorityRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveApplicationPriorityFromClusterRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceApplicationPriorityOnQueueRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SetApplicationPriorityForQueueRequestPBImpl; + +import com.google.common.collect.Sets; + +public class FileSystemApplicationPriorityStore + extends + ApplicationPriorityStore { + + public FileSystemApplicationPriorityStore(CommonApplicationPriorityManager mgr) { + super(mgr); + } + + protected static final Log LOG = LogFactory + .getLog(FileSystemApplicationPriorityStore.class); + + protected static final String DEFAULT_DIR_NAME = "application-priority"; + protected static final String MIRROR_FILENAME = "applicationpriority.mirror"; + protected static final String EDITLOG_FILENAME = "prioritylabel.editlog"; + + protected enum SerializedLogType { + ADD_TO_CLUSTER_APPLICATION_PRIORITY, SET_APPLICATION_PRIORITY_PER_QUEUE, REMOVE_FROM_CLUSTER_APPLICATION_PRIORITY + } + + Path fsWorkingPath; + FileSystem fs; + FSDataOutputStream editlogOs; + Path editLogPath; + + private String getDefaultFSApplicationPriorityRootDir() throws IOException { + // default is in local: /tmp/hadoop-yarn-${user}/application-priority/ + return "file:///tmp/hadoop-yarn-" + + UserGroupInformation.getCurrentUser().getShortUserName() + "/" + + DEFAULT_DIR_NAME; + } + + @Override + public void init(Configuration conf) throws Exception { + fsWorkingPath = new Path(conf.get( + YarnConfiguration.FS_APP_PRIORITY_STORE_ROOT_DIR, + getDefaultFSApplicationPriorityRootDir())); + + setFileSystem(conf); + + // mkdir of root dir path + fs.mkdirs(fsWorkingPath); + } + + @Override + public void close() throws IOException { + try { + fs.close(); + editlogOs.close(); + } catch (IOException e) { + LOG.warn("Exception happened whiling shutting down,", e); + } + } + + private void setFileSystem(Configuration conf) throws IOException { + Configuration confCopy = new Configuration(conf); + confCopy.setBoolean("dfs.client.retry.policy.enabled", true); + String retryPolicy = confCopy.get( + YarnConfiguration.FS_APP_PRIORITY_STORE_RETRY_POLICY_SPEC, + YarnConfiguration.DEFAULT_FS_APP_PRIORITY_STORE_RETRY_POLICY_SPEC); + confCopy.set("dfs.client.retry.policy.spec", retryPolicy); + fs = fsWorkingPath.getFileSystem(confCopy); + + // if it's local file system, use RawLocalFileSystem instead of + // LocalFileSystem, the latter one doesn't support append. + if (fs.getScheme().equals("file")) { + fs = ((LocalFileSystem) fs).getRaw(); + } + } + + private void ensureAppendEditlogFile() throws IOException { + editlogOs = fs.append(editLogPath); + } + + private void ensureCloseEditlogFile() throws IOException { + editlogOs.close(); + } + + private ApplicationPriorityPerQueuePBImpl convertFromProtoFormat( + ApplicationPriorityPerQueueProto p) { + return new ApplicationPriorityPerQueuePBImpl(p); + } + + @Override + public void setApplicationPriorityForQueue(String queueName, + ApplicationPriorityPerQueue applicationPriorityPerQueue) + throws IOException { + ensureAppendEditlogFile(); + editlogOs.writeInt(SerializedLogType.SET_APPLICATION_PRIORITY_PER_QUEUE + .ordinal()); + ((SetApplicationPriorityForQueueRequestPBImpl) SetApplicationPriorityForQueueRequest + .newInstance(queueName, applicationPriorityPerQueue)).getProto() + .writeDelimitedTo(editlogOs); + ensureCloseEditlogFile(); + } + + @Override + public void addToClusterApplicationPriorities(Set applicationPriority) + throws IOException { + ensureAppendEditlogFile(); + editlogOs.writeInt(SerializedLogType.ADD_TO_CLUSTER_APPLICATION_PRIORITY + .ordinal()); + ((AddToClusterLevelApplicationPriorityRequestPBImpl) AddToClusterLevelApplicationPriorityRequest + .newInstance(applicationPriority)).getProto().writeDelimitedTo( + editlogOs); + ensureCloseEditlogFile(); + } + + @Override + public void removeFromClusterApplicationPriorities( + Collection applicationPriority) throws IOException { + ensureAppendEditlogFile(); + editlogOs + .writeInt(SerializedLogType.REMOVE_FROM_CLUSTER_APPLICATION_PRIORITY + .ordinal()); + ((RemoveApplicationPriorityFromClusterRequestPBImpl) RemoveApplicationPriorityFromClusterRequest + .newInstance(Sets.newHashSet(applicationPriority.iterator()))) + .getProto().writeDelimitedTo(editlogOs); + ensureCloseEditlogFile(); + } + + @Override + public void recover() throws IOException { + /* + * Steps of recover 1) Read from last mirror (from mirror or mirror.old) 2) + * Read from last edit log, and apply such edit log 3) Write new mirror to + * mirror.writing 4) Rename mirror to mirror.old 5) Move mirror.writing to + * mirror 6) Remove mirror.old 7) Remove edit log and create a new empty + * edit log + */ + + // Open mirror from serialized file + Path mirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME); + Path oldMirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".old"); + + FSDataInputStream is = null; + if (fs.exists(mirrorPath)) { + is = fs.open(mirrorPath); + } else if (fs.exists(oldMirrorPath)) { + is = fs.open(oldMirrorPath); + } + + if (null != is) { + Set priority = new AddToClusterLevelApplicationPriorityRequestPBImpl( + AddToClusterLevelApplicationPriorityRequestProto + .parseDelimitedFrom(is)).getApplicationPriorities(); + Map queueToPriority = new ReplaceApplicationPriorityOnQueueRequestPBImpl( + ReplaceApplicationPriorityOnQueueRequestProto.parseDelimitedFrom(is)) + .getQueueToApplicationPriority(); + appPrManager.addToClusterApplicationPriorities(priority); + appPrManager.replaceApplicationPriorityOnQueue(queueToPriority); + is.close(); + } + + // Open and process editlog + editLogPath = new Path(fsWorkingPath, EDITLOG_FILENAME); + if (fs.exists(editLogPath)) { + is = fs.open(editLogPath); + + while (true) { + try { + // read edit log one by one + SerializedLogType type = SerializedLogType.values()[is.readInt()]; + + switch (type) { + case SET_APPLICATION_PRIORITY_PER_QUEUE : { + SetApplicationPriorityForQueueRequestProto proto = SetApplicationPriorityForQueueRequestProto + .parseDelimitedFrom(is); + String queueName = proto.getQueueName(); + ApplicationPriorityPerQueue applicationPriorityPerQueue = convertFromProtoFormat(proto + .getApplicatonPriorityPerQueue()); + appPrManager.setApplicationPriorityPerQueue(queueName, + applicationPriorityPerQueue); + break; + } + case ADD_TO_CLUSTER_APPLICATION_PRIORITY : { + Collection priorityToAdd = AddToClusterLevelApplicationPriorityRequestProto + .parseDelimitedFrom(is).getApplicationPrioritiesList(); + appPrManager.addToClusterApplicationPriorities(Sets + .newHashSet(priorityToAdd.iterator())); + break; + } + case REMOVE_FROM_CLUSTER_APPLICATION_PRIORITY : { + Collection priorityToRemove = RemoveApplicationPriorityFromClusterRequestProto + .parseDelimitedFrom(is).getApplicationPrioritiesList(); + appPrManager.removeFromClusterApplicationPriority(priorityToRemove); + break; + } + default : + break; + } + } catch (EOFException e) { + break; + } + } + } + + // Serialize current mirror to mirror.writing + Path writingMirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME + + ".writing"); + FSDataOutputStream os = fs.create(writingMirrorPath, true); + ((AddToClusterLevelApplicationPriorityRequestPBImpl) AddToClusterLevelApplicationPriorityRequest + .newInstance(appPrManager.getClusterPriorities())) + .getProto().writeDelimitedTo(os); + ((ReplaceApplicationPriorityOnQueueRequestPBImpl) ReplaceApplicationPriorityOnQueueRequest + .newInstance(appPrManager.getApplicationPrioritiesPerQueue())).getProto() + .writeDelimitedTo(os); + os.close(); + + // Move mirror to mirror.old + if (fs.exists(mirrorPath)) { + fs.delete(oldMirrorPath, false); + fs.rename(mirrorPath, oldMirrorPath); + } + + // move mirror.writing to mirror + fs.rename(writingMirrorPath, mirrorPath); + fs.delete(writingMirrorPath, false); + + // remove mirror.old + fs.delete(oldMirrorPath, false); + + // create a new editlog file + editlogOs = fs.create(editLogPath, true); + editlogOs.close(); + + LOG.info("Finished write mirror at:" + mirrorPath.toString()); + LOG.info("Finished create editlog file at:" + editLogPath.toString()); + } +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/applicationpriority/event/AddToClusterApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/applicationpriority/event/AddToClusterApplicationPriority.java new file mode 100644 index 0000000..72f500d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/applicationpriority/event/AddToClusterApplicationPriority.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applicationpriority.event; + +import java.util.Set; + +public class AddToClusterApplicationPriority + extends + ApplicationPriorityStoreEvent { + private Set applicationPriorities; + + public AddToClusterApplicationPriority(Set applicationPriorities) { + super(ApplicationPriorityStoreEventType.ADD_TO_CLUSTER_APPLICATION_PRIORITIES); + this.applicationPriorities = applicationPriorities; + } + + public Set getApplicationPriorities() { + return this.applicationPriorities; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/applicationpriority/event/ApplicationPriorityStoreEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/applicationpriority/event/ApplicationPriorityStoreEvent.java new file mode 100644 index 0000000..74b988d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/applicationpriority/event/ApplicationPriorityStoreEvent.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applicationpriority.event; + +import org.apache.hadoop.yarn.event.AbstractEvent; + +public class ApplicationPriorityStoreEvent extends + AbstractEvent { + public ApplicationPriorityStoreEvent(ApplicationPriorityStoreEventType type) { + super(type); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/applicationpriority/event/ApplicationPriorityStoreEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/applicationpriority/event/ApplicationPriorityStoreEventType.java new file mode 100644 index 0000000..991a0b3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/applicationpriority/event/ApplicationPriorityStoreEventType.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applicationpriority.event; + +public enum ApplicationPriorityStoreEventType { + ADD_TO_CLUSTER_APPLICATION_PRIORITIES, + SET_APPLICATION_PRIORITY_PER_QUEUE, + REMOVE_FROM_CLUSTER_APPLICATION_PRIORITIES +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/applicationpriority/event/RemoveFromClusterApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/applicationpriority/event/RemoveFromClusterApplicationPriority.java new file mode 100644 index 0000000..2b430d7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/applicationpriority/event/RemoveFromClusterApplicationPriority.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applicationpriority.event; + +import java.util.Collection; + +public class RemoveFromClusterApplicationPriority + extends + ApplicationPriorityStoreEvent { + private Collection applicationPriority; + + public RemoveFromClusterApplicationPriority( + Collection applicationPriority) { + super(ApplicationPriorityStoreEventType.REMOVE_FROM_CLUSTER_APPLICATION_PRIORITIES); + this.applicationPriority = applicationPriority; + } + + public Collection getApplicationPriorities() { + return applicationPriority; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/applicationpriority/event/SetApplicationPriorityPerQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/applicationpriority/event/SetApplicationPriorityPerQueue.java new file mode 100644 index 0000000..8163ff8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/applicationpriority/event/SetApplicationPriorityPerQueue.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applicationpriority.event; + +import org.apache.hadoop.yarn.api.records.ApplicationPriorityPerQueue; + + +public class SetApplicationPriorityPerQueue extends ApplicationPriorityStoreEvent { + private String queueName; + private ApplicationPriorityPerQueue applicationPriorityPerQueue; + + public SetApplicationPriorityPerQueue(String queueName, + ApplicationPriorityPerQueue applicationPriorityPerQueue) { + super(ApplicationPriorityStoreEventType.SET_APPLICATION_PRIORITY_PER_QUEUE); + this.queueName = queueName; + this.applicationPriorityPerQueue = applicationPriorityPerQueue; + } + + public String getQueueName() { + return queueName; + } + + public ApplicationPriorityPerQueue getApplicationPriorityPerQueue() { + return applicationPriorityPerQueue; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/applicationpriority/ApplicationPriorityTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/applicationpriority/ApplicationPriorityTestBase.java new file mode 100644 index 0000000..dd329a6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/applicationpriority/ApplicationPriorityTestBase.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applicationpriority; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.ApplicationPriorityPerQueue; +import org.junit.Assert; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; + +public class ApplicationPriorityTestBase { + public static void assertMapEquals( + Map m1, + ImmutableMap m2) { + Assert.assertEquals(m1.size(), m2.size()); + for (String k : m1.keySet()) { + Assert.assertTrue(m2.containsKey(k)); + m1.get(k).equals(m2.get(k)); + } + } + + public static void assertMapContains( + Map m1, + ImmutableMap m2) { + for (String k : m2.keySet()) { + Assert.assertTrue(m1.containsKey(k)); + m1.get(k).equals(m2.get(k)); + } + } + + public static void assertCollectionEquals(Collection c1, + Collection c2) { + Set s1 = new HashSet(c1); + Set s2 = new HashSet(c2); + Assert.assertEquals(s1, s2); + Assert.assertTrue(s1.containsAll(s2)); + } + + public static Set toSet(E... elements) { + Set set = Sets.newHashSet(elements); + return set; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/applicationpriority/DummyCommonApplicationPriorityManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/applicationpriority/DummyCommonApplicationPriorityManager.java new file mode 100644 index 0000000..6d36e4f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/applicationpriority/DummyCommonApplicationPriorityManager.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applicationpriority; + +import java.io.IOException; +import java.util.Collection; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationPriorityPerQueue; +import org.apache.hadoop.yarn.event.InlineDispatcher; + +public class DummyCommonApplicationPriorityManager + extends + CommonApplicationPriorityManager { + + Collection lastAddedPrioritylabels = null; + Collection lastRemovedPrioritylabels = null; + + @Override + public void initApplicationPriorityStore(Configuration conf) { + this.store = new ApplicationPriorityStore(this) { + + @Override + public void recover() throws IOException { + } + + @Override + public void close() throws IOException { + // do nothing + } + + @Override + public void setApplicationPriorityForQueue(String queueName, + ApplicationPriorityPerQueue applicationPriorityPerQueue) + throws IOException { + } + + @Override + public void addToClusterApplicationPriorities( + Set applicationPriority) throws IOException { + lastAddedPrioritylabels = applicationPriority; + + } + + @Override + public void removeFromClusterApplicationPriorities( + Collection applicationPriority) throws IOException { + lastRemovedPrioritylabels = applicationPriority; + } + }; + } + + @Override + protected void initDispatcher(Configuration conf) { + super.dispatcher = new InlineDispatcher(); + } + + @Override + protected void startDispatcher() { + // do nothing + } + + @Override + protected void stopDispatcher() { + // do nothing + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/applicationpriority/TestCommonApplicationPriorityManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/applicationpriority/TestCommonApplicationPriorityManager.java new file mode 100644 index 0000000..15afe28 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/applicationpriority/TestCommonApplicationPriorityManager.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applicationpriority; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationPriorityPerQueue; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +public class TestCommonApplicationPriorityManager + extends + ApplicationPriorityTestBase { + DummyCommonApplicationPriorityManager mgr = null; + + @Before + public void before() { + mgr = new DummyCommonApplicationPriorityManager(); + mgr.init(new Configuration()); + mgr.start(); + } + + @After + public void after() { + mgr.stop(); + } + + @Test + public void testAddRemovePrioritylabel() throws Exception { + // mgr.setMaxPriorityValue(-1); + // Add some label + mgr.addToClusterApplicationPriorities(ImmutableSet.of(1)); + assertCollectionEquals(mgr.lastAddedPrioritylabels, Arrays.asList(1)); + + // Add a label w/o integer mapping, it should take incremented value from + // last updated priority integer value. + mgr.addToClusterApplicationPriorities(ImmutableSet.of(3)); + mgr.addToClusterApplicationPriorities(toSet(5, 7)); + assertCollectionEquals(mgr.lastAddedPrioritylabels, Sets.newHashSet(5, 7)); + + Assert.assertTrue(mgr.getClusterPriorities().containsAll( + Sets.newHashSet(1, 3, 5, 7))); + + // try to remove null, empty and non-existed label, should fail + for (Integer p : Arrays.asList(null, 9)) { + boolean caught = false; + try { + mgr.removeFromClusterApplicationPriority(Arrays.asList(p)); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("remove priority should fail " + + "when label is null/empty/non-existed", caught); + } + + // Remove some label + mgr.removeFromClusterApplicationPriority(Arrays.asList(3)); + assertCollectionEquals(mgr.lastRemovedPrioritylabels, Arrays.asList(3)); + Assert.assertTrue(mgr.getClusterPriorities().containsAll( + Arrays.asList(1, 5, 7))); + + mgr.removeFromClusterApplicationPriority(Arrays.asList(1, 5, 7)); + Assert.assertTrue(mgr.lastRemovedPrioritylabels.containsAll(Sets + .newHashSet(1, 5, 7))); + Assert.assertTrue(mgr.getClusterPriorities().isEmpty()); + } + + @Test + public void testAddandSetPrioritylabel() throws Exception { + // Add some label + mgr.addToClusterApplicationPriorities(ImmutableSet.of(1)); + assertCollectionEquals(mgr.lastAddedPrioritylabels, Arrays.asList(1)); + + mgr.addToClusterApplicationPriorities(toSet(2, 3)); + assertCollectionEquals(mgr.lastAddedPrioritylabels, Sets.newHashSet(2, 3)); + + Assert.assertTrue(mgr.getClusterPriorities().containsAll( + Sets.newHashSet(1, 2, 3))); + + // Set non-exiting labels to a queue + boolean caught = false; + try { + mgr.setApplicationPriorityPerQueue("queueA", + ApplicationPriorityPerQueue.newInstance(4, 1)); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("Cannot set label to a Queue " + + "when label is non-existing in Queue", caught); + + // Set some labels to a queue + mgr.setApplicationPriorityPerQueue("queueA", + ApplicationPriorityPerQueue.newInstance(2, 1)); + + Assert.assertTrue(mgr.getApplicationPriorityFromQueue("queueA").equals( + ApplicationPriorityPerQueue.newInstance(2, 1))); + } + + @Test + public void testAddReplaceRemovePriorityLabelsInQueue() throws Exception { + // set a label on a queue, but label doesn't exist + boolean caught = false; + try { + mgr.replaceApplicationPriorityOnQueue(ImmutableMap.of("queueA", + ApplicationPriorityPerQueue.newInstance(2, 1))); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("Trying to replace priority label to a queue but " + + "label doesn't exist in repository should fail", caught); + + // replace queue->pr label one by one with newly added label in cluster + mgr.addToClusterApplicationPriorities(toSet(1, 2, 3)); + mgr.replaceApplicationPriorityOnQueue(ImmutableMap.of("queueA", + ApplicationPriorityPerQueue.newInstance(2, 3))); + mgr.replaceApplicationPriorityOnQueue(ImmutableMap.of("queueB", + ApplicationPriorityPerQueue.newInstance(3, 1))); + assertMapEquals(mgr.getApplicationPrioritiesPerQueue(), ImmutableMap.of( + "queueB", ApplicationPriorityPerQueue.newInstance(3, 1), "queueA", + ApplicationPriorityPerQueue.newInstance(2, 3))); + } + + @Test(timeout = 5000) + public void testRemovePrioritylabelAddedToQueue() throws Exception { + mgr.addToClusterApplicationPriorities(toSet(1, 2, 3)); + mgr.replaceApplicationPriorityOnQueue(ImmutableMap.of("queueA", + ApplicationPriorityPerQueue.newInstance(2, 1))); + mgr.replaceApplicationPriorityOnQueue(ImmutableMap.of("queueB", + ApplicationPriorityPerQueue.newInstance(3, 1))); + + boolean caught = false; + try { + mgr.removeFromClusterApplicationPriority(ImmutableSet.of(2)); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("Trying to remove priority label from cluster but " + + "label exists in a queue, should fail", caught); + + mgr.replaceApplicationPriorityOnQueue(ImmutableMap.of("queueA", + ApplicationPriorityPerQueue.newInstance(1, 3))); + + // Now removal should be a success + mgr.removeFromClusterApplicationPriority(ImmutableSet.of(2)); + assertMapEquals(mgr.getApplicationPrioritiesPerQueue(), ImmutableMap.of( + "queueB", ApplicationPriorityPerQueue.newInstance(3, 1), "queueA", + ApplicationPriorityPerQueue.newInstance(1, 3))); + assertCollectionEquals(mgr.lastRemovedPrioritylabels, Arrays.asList(2)); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/applicationpriority/TestFileSystemApplicationPriorityStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/applicationpriority/TestFileSystemApplicationPriorityStore.java new file mode 100644 index 0000000..caa2a36 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/applicationpriority/TestFileSystemApplicationPriorityStore.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applicationpriority; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationPriorityPerQueue; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.InlineDispatcher; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; + +public class TestFileSystemApplicationPriorityStore + extends + ApplicationPriorityTestBase { + MockPriorityLabelManager mgr = null; + Configuration conf = null; + + private static class MockPriorityLabelManager + extends + CommonApplicationPriorityManager { + @Override + protected void initDispatcher(Configuration conf) { + super.dispatcher = new InlineDispatcher(); + } + + @Override + protected void startDispatcher() { + // do nothing + } + + @Override + protected void stopDispatcher() { + // do nothing + } + } + + private FileSystemApplicationPriorityStore getStore() { + return (FileSystemApplicationPriorityStore) mgr.store; + } + + @Before + public void before() throws IOException { + mgr = new MockPriorityLabelManager(); + conf = new Configuration(); + File tempDir = File.createTempFile("nlb", ".tmp"); + tempDir.delete(); + tempDir.mkdirs(); + tempDir.deleteOnExit(); + conf.set(YarnConfiguration.FS_APP_PRIORITY_STORE_ROOT_DIR, + tempDir.getAbsolutePath()); + mgr.init(conf); + mgr.start(); + } + + @After + public void after() throws IOException { + getStore().fs.delete(getStore().fsWorkingPath, true); + mgr.stop(); + } + + @Test + public void testRecoverWithMirror() throws Exception { + mgr.addToClusterApplicationPriorities(toSet(1, 2, 3)); + mgr.addToClusterApplicationPriorities(toSet(4)); + mgr.addToClusterApplicationPriorities(toSet(5)); + mgr.setApplicationPriorityPerQueue("queueA", + ApplicationPriorityPerQueue.newInstance(3, 2)); + mgr.replaceApplicationPriorityOnQueue(ImmutableMap.of("queueB", + ApplicationPriorityPerQueue.newInstance(4, 2), "queueC", + ApplicationPriorityPerQueue.newInstance(5, 3))); + Assert.assertEquals(5, mgr.getClusterPriorities().size()); + + mgr.removeFromClusterApplicationPriority(toSet(1)); + + // shutdown mgr and start a new mgr + mgr.stop(); + + mgr = new MockPriorityLabelManager(); + mgr.init(conf); + + // check variables + Assert.assertEquals(4, mgr.getClusterPriorities().size()); + Assert.assertTrue(mgr.getClusterPriorities().containsAll( + Arrays.asList(2, 3, 4, 5))); + + assertMapContains(mgr.getApplicationPrioritiesPerQueue(), ImmutableMap.of( + "queueB", ApplicationPriorityPerQueue.newInstance(4, 2), "queueC", + ApplicationPriorityPerQueue.newInstance(5, 3), "queueA", + ApplicationPriorityPerQueue.newInstance(3, 2))); + + // shutdown mgr and start a new mgr + mgr.stop(); + mgr = new MockPriorityLabelManager(); + mgr.init(conf); + + // check variables + Assert.assertEquals(4, mgr.getClusterPriorities().size()); + Assert.assertTrue(mgr.getClusterPriorities().containsAll( + Arrays.asList(2, 3, 4, 5))); + + assertMapContains(mgr.getApplicationPrioritiesPerQueue(), ImmutableMap.of( + "queueB", ApplicationPriorityPerQueue.newInstance(4, 2), "queueC", + ApplicationPriorityPerQueue.newInstance(5, 3), "queueA", + ApplicationPriorityPerQueue.newInstance(3, 2))); + mgr.stop(); + } + + @Test(timeout = 10000) + public void testEditlogRecover() throws Exception { + mgr.addToClusterApplicationPriorities(toSet(1, 2, 3)); + mgr.addToClusterApplicationPriorities(toSet(4)); + mgr.addToClusterApplicationPriorities(toSet(5)); + mgr.setApplicationPriorityPerQueue("queueA", + ApplicationPriorityPerQueue.newInstance(3, 2)); + mgr.replaceApplicationPriorityOnQueue(ImmutableMap.of("queueB", + ApplicationPriorityPerQueue.newInstance(4, 2), "queueC", + ApplicationPriorityPerQueue.newInstance(5, 3))); + Assert.assertEquals(5, mgr.getClusterPriorities().size()); + + mgr.removeFromClusterApplicationPriority(toSet(1)); + /* + * After removed queueA: medium,low queueB: high,low queueC: v_high,medium + */ + + // shutdown mgr and start a new mgr + mgr.stop(); + + mgr = new MockPriorityLabelManager(); + mgr.init(conf); + + // check variables + Assert.assertEquals(4, mgr.getClusterPriorities().size()); + Assert.assertTrue(mgr.getClusterPriorities().containsAll( + Arrays.asList(2, 3, 4, 5))); + + assertMapContains(mgr.getApplicationPrioritiesPerQueue(), ImmutableMap.of( + "queueB", ApplicationPriorityPerQueue.newInstance(4, 2), "queueC", + ApplicationPriorityPerQueue.newInstance(5, 3), "queueA", + ApplicationPriorityPerQueue.newInstance(3, 2))); + mgr.stop(); + } + + @Test + // (timeout = 10000) + public void testSerilizationAfterRecovery() throws Exception { + mgr.addToClusterApplicationPriorities(toSet(1, 2, 3)); + mgr.addToClusterApplicationPriorities(toSet(4)); + mgr.setApplicationPriorityPerQueue("queueA", + ApplicationPriorityPerQueue.newInstance(3, 2)); + mgr.replaceApplicationPriorityOnQueue(ImmutableMap.of("queueB", + ApplicationPriorityPerQueue.newInstance(4, 2))); + + mgr.removeFromClusterApplicationPriority(toSet(1)); + + // shutdown mgr and start a new mgr + mgr.stop(); + + mgr = new MockPriorityLabelManager(); + mgr.init(conf); + mgr.start(); + + // check variables + Assert.assertEquals(3, mgr.getClusterPriorities().size()); + Assert.assertTrue(mgr.getClusterPriorities().containsAll( + Arrays.asList(2, 3, 4))); + + assertMapContains(mgr.getApplicationPrioritiesPerQueue(), ImmutableMap.of( + "queueB", ApplicationPriorityPerQueue.newInstance(4, 2), "queueA", + ApplicationPriorityPerQueue.newInstance(3, 2))); + + /* + * Add label v_high and set to queueC then shutdown + */ + mgr = new MockPriorityLabelManager(); + mgr.init(conf); + mgr.start(); + mgr.addToClusterApplicationPriorities(toSet(5)); + mgr.setApplicationPriorityPerQueue("queueC", + ApplicationPriorityPerQueue.newInstance(5, 2)); + mgr.stop(); + + /* + * Recovery, and see if v_high is added in cluter and queueC + */ + mgr = new MockPriorityLabelManager(); + mgr.init(conf); + mgr.start(); + + // check variables + Assert.assertEquals(4, mgr.getClusterPriorities().size()); + Assert.assertTrue(mgr.getClusterPriorities().containsAll( + Arrays.asList(2, 3, 4, 5))); + assertMapContains(mgr.getApplicationPrioritiesPerQueue(), ImmutableMap.of( + "queueB", ApplicationPriorityPerQueue.newInstance(4, 2), "queueA", + ApplicationPriorityPerQueue.newInstance(3, 2), "queueC", + ApplicationPriorityPerQueue.newInstance(5, 2))); + mgr.stop(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/RMApplicationPriorityManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/RMApplicationPriorityManager.java new file mode 100644 index 0000000..0e354e6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationpriority/RMApplicationPriorityManager.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.applicationpriority; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.applicationpriority.CommonApplicationPriorityManager; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; + + +public class RMApplicationPriorityManager extends CommonApplicationPriorityManager { + + protected AccessControlList adminAcl; + private RMContext rmContext = null; + private Configuration conf = null; + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + this.conf = conf; + adminAcl = new AccessControlList(conf.get(YarnConfiguration.YARN_ADMIN_ACL, + YarnConfiguration.DEFAULT_YARN_ADMIN_ACL)); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + + // yarn-site.xml can have cluster priority label config, read and update. + String[] configPrLabels = conf + .getTrimmedStrings(YarnConfiguration.YARN_CLUSTER_APP_PRIORITY); + } + + public RMContext getRMContext() { + return this.rmContext; + } + + public void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; + } + + public boolean checkAdminAccess(UserGroupInformation user) { + // make sure only admin can invoke + // this method + if (adminAcl.isUserAllowed(user)) { + return true; + } + return false; + } +} -- 1.9.4.msysgit.1