diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 03bdd3a..d9a1d5c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -109,6 +109,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.CSConfigurationProvider; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.FileBasedCSConfigurationProvider; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.StoreBasedCSConfigurationProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; @@ -167,6 +170,7 @@ private int offswitchPerHeartbeatLimit; + private CSConfigurationProvider csConfProvider; @Override public void setConf(Configuration conf) { @@ -290,7 +294,21 @@ void initScheduler(Configuration configuration) throws IOException { try { writeLock.lock(); - this.conf = loadCapacitySchedulerConfiguration(configuration); + String confProviderStr = configuration.get( + CapacitySchedulerConfiguration.CS_CONF_PROVIDER, + CapacitySchedulerConfiguration.FILE_CS_CONF_PROVIDER); + if (confProviderStr.equals( + CapacitySchedulerConfiguration.FILE_CS_CONF_PROVIDER)) { + this.csConfProvider = new FileBasedCSConfigurationProvider(rmContext); + } else if (confProviderStr.equals( + CapacitySchedulerConfiguration.DERBY_CS_CONF_PROVIDER)) { + this.csConfProvider = new StoreBasedCSConfigurationProvider(rmContext); + } else { + throw new IOException("Invalid CS configuration provider: " + + confProviderStr); + } + this.csConfProvider.init(configuration); + this.conf = this.csConfProvider.loadConfiguration(configuration); validateConf(this.conf); this.minimumAllocation = this.conf.getMinimumAllocation(); initMaximumResourceCapability(this.conf.getMaximumAllocation()); @@ -397,7 +415,7 @@ public void reinitialize(Configuration newConf, RMContext rmContext) writeLock.lock(); Configuration configuration = new Configuration(newConf); CapacitySchedulerConfiguration oldConf = this.conf; - this.conf = loadCapacitySchedulerConfiguration(configuration); + this.conf = csConfProvider.loadConfiguration(configuration); validateConf(this.conf); try { LOG.info("Re-initializing queues..."); @@ -1856,23 +1874,6 @@ public boolean isSystemAppsLimitReached() { return true; } - private CapacitySchedulerConfiguration loadCapacitySchedulerConfiguration( - Configuration configuration) throws IOException { - try { - InputStream CSInputStream = - this.rmContext.getConfigurationProvider() - .getConfigurationInputStream(configuration, - YarnConfiguration.CS_CONFIGURATION_FILE); - if (CSInputStream != null) { - configuration.addResource(CSInputStream); - return new CapacitySchedulerConfiguration(configuration, false); - } - return new CapacitySchedulerConfiguration(configuration, true); - } catch (Exception e) { - throw new IOException(e); - } - } - private String getDefaultReservationQueueName(String planQueueName) { return planQueueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 43ec390..43c0aef 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -285,6 +285,16 @@ @Private public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false; + @Private + public static final String CS_CONF_PROVIDER = PREFIX + + "configuration.provider"; + + @Private + public static final String FILE_CS_CONF_PROVIDER = "file"; + + @Private + public static final String DERBY_CS_CONF_PROVIDER = "derby"; + AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser(); public CapacitySchedulerConfiguration() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/CSConfigurationProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/CSConfigurationProvider.java new file mode 100644 index 0000000..b7a8b41 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/CSConfigurationProvider.java @@ -0,0 +1,28 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; + +import java.io.IOException; + +/** + * Configuration provider for {@link CapacityScheduler}. + */ +public interface CSConfigurationProvider { + + /** + * Initialize the configuration provider with given conf. + * @param conf configuration to initialize with + */ + void init(Configuration conf); + + /** + * Loads capacity scheduler configuration object. + * @param conf initial bootstrap configuration + * @return CS configuration + * @throws IOException if fail to retrieve configuration + */ + CapacitySchedulerConfiguration loadConfiguration(Configuration conf) + throws IOException; +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FileBasedCSConfigurationProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FileBasedCSConfigurationProvider.java new file mode 100644 index 0000000..eff79db --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FileBasedCSConfigurationProvider.java @@ -0,0 +1,48 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; + +import java.io.IOException; +import java.io.InputStream; + +/** + * {@link CapacityScheduler} configuration provider based on local + * {@code capacity-scheduler.xml} file. + */ +public class FileBasedCSConfigurationProvider implements CSConfigurationProvider { + + private RMContext rmContext; + + /** + * Construct file based CS configuration provider with given context. + * @param rmContext the RM context + */ + public FileBasedCSConfigurationProvider(RMContext rmContext) { + this.rmContext = rmContext; + } + + @Override + public void init(Configuration conf) {} + + @Override + public CapacitySchedulerConfiguration loadConfiguration(Configuration conf) + throws IOException { + try { + InputStream CSInputStream = + this.rmContext.getConfigurationProvider() + .getConfigurationInputStream(conf, + YarnConfiguration.CS_CONFIGURATION_FILE); + if (CSInputStream != null) { + conf.addResource(CSInputStream); + return new CapacitySchedulerConfiguration(conf, false); + } + return new CapacitySchedulerConfiguration(conf, true); + } catch (Exception e) { + throw new IOException(e); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/StoreBasedCSConfigurationProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/StoreBasedCSConfigurationProvider.java new file mode 100644 index 0000000..e6b75cf --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/StoreBasedCSConfigurationProvider.java @@ -0,0 +1,41 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.store.DerbyConfigurationStore; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.store.YarnConfigurationStore; + +/** + * {@link CapacityScheduler} configuration provider based on persistent storage. + */ +public class StoreBasedCSConfigurationProvider implements CSConfigurationProvider { + + private RMContext rmContext; + private YarnConfigurationStore confStore; + + /** + * Constructs store based CS configuration provider with given context. + * @param rmContext the RM context + */ + public StoreBasedCSConfigurationProvider(RMContext rmContext) { + this.rmContext = rmContext; + } + + @Override + public void init(Configuration conf) { + String confProviderStr = conf.get( + CapacitySchedulerConfiguration.CS_CONF_PROVIDER, + CapacitySchedulerConfiguration.DERBY_CS_CONF_PROVIDER); + if (confProviderStr.equals( + CapacitySchedulerConfiguration.DERBY_CS_CONF_PROVIDER)) { + this.confStore = new DerbyConfigurationStore(); + } + } + + @Override + public CapacitySchedulerConfiguration loadConfiguration(Configuration conf) { + return null; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/store/DerbyConfigurationStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/store/DerbyConfigurationStore.java new file mode 100644 index 0000000..7b10781 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/store/DerbyConfigurationStore.java @@ -0,0 +1,4 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.store; + +public class DerbyConfigurationStore implements YarnConfigurationStore { +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/store/YarnConfigurationStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/store/YarnConfigurationStore.java new file mode 100644 index 0000000..8139904 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/store/YarnConfigurationStore.java @@ -0,0 +1,4 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.store; + +public interface YarnConfigurationStore { +}