diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 4799137..61a163a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -698,6 +698,9 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String LEVELDB_CONFIGURATION_STORE = "leveldb";
@Private
@Unstable
+ public static final String HDFS_CONFIGURATION_STORE = "hdfs";
+ @Private
+ @Unstable
public static final String ZK_CONFIGURATION_STORE = "zk";
@Private
@Unstable
@@ -705,6 +708,14 @@ public static boolean isAclEnabled(Configuration conf) {
FILE_CONFIGURATION_STORE;
@Private
@Unstable
+ public static final String SCHEDULER_CONFIGURATION_HFDS_PATH = YARN_PREFIX
+ + "scheduler.configuration.hdfs.path";
+ @Private
+ @Unstable
+ public static final String SCHEDULER_CONFIGURATION_MAX_VERSION = YARN_PREFIX
+ + "scheduler.configuration.max.version";
+ @Private
+ @Unstable
public static final String RM_SCHEDCONF_STORE_PATH = YARN_PREFIX
+ "scheduler.configuration.leveldb-store.path";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index e90d0f2..db1cd85 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3460,6 +3460,23 @@
+ hdfs path for configuration store when using hdfs-based
+ configuration store.
+
+ yarn.scheduler.configuration.hdfs.path
+ /confstore
+
+
+
+
+ max versions of scheduler configuration in hdfs
+
+ yarn.scheduler.configuration.max.version
+ 100
+
+
+
+
Provides an option for client to load supported resource types from RM
instead of depending on local resource-types.xml file.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 6f630f8..8037a48 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -306,6 +306,7 @@ void initScheduler(Configuration configuration) throws
case YarnConfiguration.MEMORY_CONFIGURATION_STORE:
case YarnConfiguration.LEVELDB_CONFIGURATION_STORE:
case YarnConfiguration.ZK_CONFIGURATION_STORE:
+ case YarnConfiguration.HDFS_CONFIGURATION_STORE:
this.csConfProvider = new MutableCSConfigurationProvider(rmContext);
break;
default:
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/HDFSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/HDFSSchedulerConfigurationStore.java
new file mode 100644
index 0000000..605259d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/HDFSSchedulerConfigurationStore.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.gson.GsonBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.YARN_PREFIX;
+
+/**
+ * A HDFS extends of {@link YarnConfigurationStore}. Offer
+ * configuration storage in HDFS
+ */
+public class HDFSSchedulerConfigurationStore extends YarnConfigurationStore {
+ public static final Log LOG = LogFactory.getLog(HDFSSchedulerConfigurationStore.class);
+
+ @VisibleForTesting
+ protected static final Version CURRENT_VERSION_INFO = Version
+ .newInstance(0, 1);
+
+ private static final String TMP = ".tmp";
+
+ private int maxVersion;
+ private Path schedulerConfDir;
+ private FileSystem fileSystem;
+ private LogMutation pendingMutation;
+ private PathFilter configFilePathFileter;
+ private volatile Configuration schedConf;
+
+ @Override
+ public void initialize(Configuration conf, Configuration schedConf,
+ RMContext rmContext) throws Exception {
+ this.configFilePathFileter = new PathFilter() {
+ @Override public boolean accept(Path path) {
+ String pathName = path.getName();
+ return pathName.startsWith(
+ YarnConfiguration.CS_CONFIGURATION_FILE) && !pathName.endsWith(TMP) ;
+ }
+ };
+
+ String schedulerConfPathStr = conf.get(YarnConfiguration.SCHEDULER_CONFIGURATION_HFDS_PATH);
+ if (schedulerConfPathStr == null || schedulerConfPathStr.isEmpty()) {
+ throw new RuntimeException(
+ YarnConfiguration.SCHEDULER_CONFIGURATION_HFDS_PATH + " must be set");
+ }
+ this.schedulerConfDir = new Path(schedulerConfPathStr);
+ this.fileSystem = this.schedulerConfDir.getFileSystem(conf);
+ this.maxVersion = conf.getInt(YarnConfiguration.SCHEDULER_CONFIGURATION_MAX_VERSION, 100);
+ LOG.info("schedulerConfDir=" + schedulerConfPathStr);
+ LOG.info("capacity scheduler file max version = " + maxVersion);
+
+ if (!fileSystem.exists(schedulerConfDir)) {
+ if (!fileSystem.mkdirs(schedulerConfDir)) {
+ throw new IOException("mkdir " + schedulerConfPathStr + " failed");
+ }
+ }
+
+ // create capacity-schedule.xml.ts file if not existing
+ if (this.getConfigFileInputStream() == null) {
+ writeConfigurationToHdfs(schedConf);
+ }
+
+ this.schedConf = this.getConfigurationFromHdfs();
+ }
+
+ // TODO should do log because update may be lost when hdfs can not access or
+ // resourcemanager stop
+ @Override
+ public synchronized void logMutation(LogMutation logMutation) {
+ pendingMutation = logMutation;
+ LOG.info(new GsonBuilder().serializeNulls().create().toJson(logMutation));
+ }
+
+ @Override
+ protected Version getConfStoreVersion() throws Exception {
+ return null;
+ }
+
+ @Override
+ protected void storeVersion() throws Exception {
+
+ }
+
+ @Override
+ protected Version getCurrentVersion() {
+ return null;
+ }
+
+ @Override
+ public void checkVersion() {
+ // Does nothing. (Version is always compatible since it's in memory)
+ }
+
+ @Override
+ public List getConfirmedConfHistory(long fromId) {
+ return null; // unimplemented
+ }
+
+ @Override
+ public synchronized void confirmMutation(boolean isValid)
+ throws IOException {
+ if (isValid) {
+ for (Map.Entry kv : pendingMutation.getUpdates().entrySet()) {
+ if (kv.getValue() == null) {
+ this.schedConf.unset(kv.getKey());
+ } else {
+ this.schedConf.set(kv.getKey(), kv.getValue());
+ }
+ }
+ writeConfigurationToHdfs(schedConf);
+ }
+ pendingMutation = null;
+ }
+
+ @Override
+ public synchronized Configuration retrieve() throws IOException {
+ return getConfigurationFromHdfs();
+ }
+
+ private Configuration getConfigurationFromHdfs() throws IOException {
+ long start = Time.monotonicNow();
+
+ Configuration conf = new Configuration(false);
+ InputStream configInputStream = getConfigFileInputStream();
+ if (configInputStream == null) {
+ throw new IOException(
+ "no capacity scheduler file in " + this.schedulerConfDir);
+ }
+
+ conf.addResource(configInputStream);
+ Configuration result = new Configuration(false);
+ for (Map.Entry entry : conf) {
+ result.set(entry.getKey(), entry.getValue());
+ }
+ LOG.info("upload conf from hdfs took " + (Time.monotonicNow() - start) + " ms");
+ this.schedConf = result;
+ return result;
+ }
+
+ private InputStream getConfigFileInputStream() throws IOException {
+ Path lastestConfigPath = getLatestConfigPath();
+ if (lastestConfigPath == null) {
+ return null;
+ }
+ return fileSystem.open(lastestConfigPath);
+ }
+
+ private Path getLatestConfigPath() throws IOException {
+ FileStatus[] fileStatuses = fileSystem.listStatus(this.schedulerConfDir,
+ this.configFilePathFileter);
+
+ if (fileStatuses == null || fileStatuses.length == 0) {
+ return null;
+ }
+ Arrays.sort(fileStatuses);
+
+ return fileStatuses[fileStatuses.length - 1].getPath();
+ }
+
+
+ @VisibleForTesting
+ void writeConfigurationToHdfs(Configuration schedConf) {
+ long start = Time.monotonicNow();
+ while (true) {
+ //use different filename when retry
+ String schedulerConfigFile = YarnConfiguration.CS_CONFIGURATION_FILE + "."
+ + System.currentTimeMillis();
+ String tmpSchedulerConfigFile = YarnConfiguration.CS_CONFIGURATION_FILE
+ + "." + System.currentTimeMillis() + TMP;
+
+ Path schdulerConfigFilePath = new Path(this.schedulerConfDir,
+ schedulerConfigFile);
+ Path tmpSchdulerConfigFilePath = new Path(this.schedulerConfDir,
+ tmpSchedulerConfigFile);
+
+ try {
+ //clean configuration file when num exceed maxVersion
+ cleanConfigurationFile();
+ FSDataOutputStream outputStream = fileSystem.create(
+ tmpSchdulerConfigFilePath);
+ schedConf.writeXml(outputStream);
+ outputStream.close();
+ fileSystem.rename(tmpSchdulerConfigFilePath, schdulerConfigFilePath);
+
+ LOG.info(
+ "write capacity configuration successfully, schedulerConfigFile="
+ + schedulerConfigFile);
+ break;
+ } catch (Exception e) {
+ LOG.info("write cacpacity configuration fail, schedulerConfigFile="
+ + schedulerConfigFile);
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ LOG.info("write conf to hdfs took " + (Time.monotonicNow() - start) + " ms");
+ }
+
+ private void cleanConfigurationFile() throws IOException {
+ FileStatus[] fileStatuses = fileSystem.listStatus(this.schedulerConfDir,
+ this.configFilePathFileter);
+
+ if (fileStatuses == null || fileStatuses.length <= this.maxVersion) {
+ return;
+ }
+ Arrays.sort(fileStatuses);
+ int configFileNum = fileStatuses.length;
+ if (fileStatuses.length > this.maxVersion) {
+ for (int i = 0; i < configFileNum - this.maxVersion; i++) {
+ fileSystem.delete(fileStatuses[i].getPath(), false);
+ LOG.info("delete config file " + fileStatuses[i].getPath());
+ }
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
index 40a19a4..17e70ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
@@ -77,6 +77,9 @@ public void init(Configuration config) throws IOException {
case YarnConfiguration.ZK_CONFIGURATION_STORE:
this.confStore = new ZKConfigurationStore();
break;
+ case YarnConfiguration.HDFS_CONFIGURATION_STORE:
+ this.confStore = new HDFSSchedulerConfigurationStore();
+ break;
default:
this.confStore = YarnConfigurationStoreFactory.getStore(config);
break;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
index 7fb52fc..ef0a44b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
@@ -123,7 +123,7 @@ public void close() throws IOException {}
* Retrieve the persisted configuration.
* @return configuration as key-value
*/
- public abstract Configuration retrieve();
+ public abstract Configuration retrieve() throws IOException;
/**
* Get a list of confirmed configuration mutations starting from a given id.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestHDFSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestHDFSSchedulerConfigurationStore.java
new file mode 100644
index 0000000..6162a88
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestHDFSSchedulerConfigurationStore.java
@@ -0,0 +1,408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+ .MutableConfigurationProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf
+ .YarnConfigurationStore.LogMutation;
+import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
+import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestHDFSSchedulerConfigurationStore extends ConfigurationStoreBaseTest{
+ private TestingServer curatorTestingServer;
+ private File testSchedulerConfigurationDir;
+
+ @Override
+ protected YarnConfigurationStore createConfStore() {
+ return new HDFSSchedulerConfigurationStore();
+ }
+
+ public static TestingServer setupCuratorServer() throws Exception {
+ TestingServer curatorTestingServer = new TestingServer();
+ curatorTestingServer.start();
+ return curatorTestingServer;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ curatorTestingServer = setupCuratorServer();
+ testSchedulerConfigurationDir = new File(
+ TestHDFSSchedulerConfigurationStore.class.getResource("").getPath()
+ + HDFSSchedulerConfigurationStore.class.getSimpleName());
+ testSchedulerConfigurationDir.mkdirs();
+
+ conf = new Configuration();
+ conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_HFDS_PATH,
+ testSchedulerConfigurationDir.getAbsolutePath());
+ }
+
+ private void writeConf(Configuration conf) throws IOException {
+ FileSystem fileSystem = FileSystem.get(new Configuration(conf));
+ String schedulerConfigurationFile = YarnConfiguration.CS_CONFIGURATION_FILE
+ + "." + System.currentTimeMillis();
+ FSDataOutputStream outputStream = fileSystem.create(
+ new Path(testSchedulerConfigurationDir.getAbsolutePath(),
+ schedulerConfigurationFile));
+ conf.writeXml(outputStream);
+ outputStream.close();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ FileUtils.deleteDirectory(testSchedulerConfigurationDir);
+ }
+
+ @Test
+ public void initialize()
+ throws Exception {
+ confStore.initialize(conf, new Configuration(false), null);
+ confStore.retrieve().writeXml(System.out);
+ }
+
+ @Test
+ public void confirmMutationWithValid() throws Exception {
+ conf.setInt(YarnConfiguration.SCHEDULER_CONFIGURATION_MAX_VERSION, 2);
+ conf.set("a", "a");
+ conf.set("b", "b");
+ conf.set("c", "c");
+ writeConf(conf);
+ confStore.initialize(conf, conf, null);
+
+ Map updates = new HashMap<>();
+ updates.put("a", null);
+ updates.put("b", "bb");
+
+ Configuration expectConfig = new Configuration(conf);
+ expectConfig.unset("a");
+ expectConfig.set("b", "bb");
+
+ LogMutation logMutation = new LogMutation(updates, "test");
+
+ confStore.logMutation(logMutation);
+ confStore.confirmMutation(true);
+ Configuration storeConf = confStore.retrieve();
+ assertEquals(null, storeConf.get("a"));
+ assertEquals("bb", storeConf.get("b"));
+ assertEquals("c", storeConf.get("c"));
+
+ compareConfig(expectConfig, storeConf);
+
+ updates.put("b", "bbb");
+
+ confStore.logMutation(logMutation);
+ confStore.confirmMutation(true);
+ storeConf = confStore.retrieve();
+ assertEquals(null, storeConf.get("a"));
+ assertEquals("bbb", storeConf.get("b"));
+ assertEquals("c", storeConf.get("c"));
+
+
+ }
+
+ @Test
+ public void confirmMutationWithInValid() throws Exception {
+ conf.set("a", "a");
+ conf.set("b", "b");
+ conf.set("c", "c");
+ writeConf(conf);
+ confStore.initialize(conf, conf, null);
+
+ Map updates = new HashMap<>();
+ updates.put("a", null);
+ updates.put("b", "bb");
+
+ LogMutation logMutation = new LogMutation(updates, "test");
+
+ confStore.confirmMutation(false);
+ Configuration storeConf = confStore.retrieve();
+
+ compareConfig(conf, storeConf);
+ }
+
+ @Test
+ public void retrieve() throws Exception {
+ Configuration schedulerConf = new Configuration();
+ schedulerConf.set("a", "a");
+ schedulerConf.setLong("long", 1L);
+ schedulerConf.setBoolean("boolean", true);
+ writeConf(schedulerConf);
+
+ confStore.initialize(conf, conf, null);
+ Configuration storedConfig = confStore.retrieve();
+
+ compareConfig(schedulerConf, storedConfig);
+ }
+
+ private void compareConfig(Configuration schedulerConf,
+ Configuration storedConfig) {
+ for (Map.Entry entry : schedulerConf) {
+ assertEquals(entry.getKey(), schedulerConf.get(entry.getKey()),
+ storedConfig.get(entry.getKey()));
+ }
+
+ for (Map.Entry entry : storedConfig) {
+ assertEquals(entry.getKey(), storedConfig.get(entry.getKey()),
+ schedulerConf.get(entry.getKey()));
+ }
+ }
+
+ public Configuration createRMHAConf(String rmIds, String rmId,
+ int adminPort) {
+ Configuration conf = new YarnConfiguration();
+ this.conf.setClass(YarnConfiguration.RM_SCHEDULER,
+ CapacityScheduler.class, CapacityScheduler.class);
+ conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+ conf.set(YarnConfiguration.RM_HA_IDS, rmIds);
+ conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+ conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
+ YarnConfiguration.HDFS_CONFIGURATION_STORE);
+ conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_HFDS_PATH,
+ testSchedulerConfigurationDir.getAbsolutePath());
+ conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
+ conf.set(YarnConfiguration.RM_ZK_ADDRESS,
+ curatorTestingServer.getConnectString());
+ conf.set(YarnConfiguration.RM_HA_ID, rmId);
+ conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0");
+ conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
+ for (String rpcAddress :
+ YarnConfiguration.getServiceAddressConfKeys(conf)) {
+ for (String id : HAUtil.getRMHAIds(conf)) {
+ conf.set(HAUtil.addSuffix(rpcAddress, id), "localhost:0");
+ }
+ }
+ conf.set(HAUtil.addSuffix(YarnConfiguration.RM_ADMIN_ADDRESS, rmId),
+ "localhost:" + adminPort);
+ return conf;
+ }
+
+ /**
+ * When failing over, new active RM should read from current state of store,
+ * including any updates when the new active RM was in standby.
+ * @throws Exception
+ */
+ @Test
+ public void testFailoverReadsFromUpdatedStore() throws Exception {
+ HAServiceProtocol.StateChangeRequestInfo req =
+ new HAServiceProtocol.StateChangeRequestInfo(
+ HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+
+ Configuration conf1 = createRMHAConf("rm1,rm2", "rm1", 1234);
+ ResourceManager rm1 = new MockRM(conf1);
+ rm1.start();
+ rm1.getRMContext().getRMAdminService().transitionToActive(req);
+ assertEquals("RM with ZKStore didn't start",
+ Service.STATE.STARTED, rm1.getServiceState());
+ assertEquals("RM should be Active",
+ HAServiceProtocol.HAServiceState.ACTIVE,
+ rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
+ assertNull(((MutableConfScheduler) rm1.getResourceScheduler())
+ .getConfiguration().get("key"));
+
+ Configuration conf2 = createRMHAConf("rm1,rm2", "rm2", 5678);
+ ResourceManager rm2 = new MockRM(conf2);
+ rm2.start();
+ assertEquals("RM should be Standby",
+ HAServiceProtocol.HAServiceState.STANDBY,
+ rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
+
+ // Update configuration on RM1
+ SchedConfUpdateInfo schedConfUpdateInfo = new SchedConfUpdateInfo();
+ schedConfUpdateInfo.getGlobalParams().put("key", "val");
+ MutableConfigurationProvider confProvider = ((MutableConfScheduler)
+ rm1.getResourceScheduler()).getMutableConfProvider();
+ UserGroupInformation user = UserGroupInformation
+ .createUserForTesting(TEST_USER, new String[0]);
+ confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+ rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext());
+ assertEquals("val", ((MutableConfScheduler) rm1.getResourceScheduler())
+ .getConfiguration().get("key"));
+ confProvider.confirmPendingMutation(true);
+ assertEquals("val", ((MutableCSConfigurationProvider) confProvider)
+ .getConfStore().retrieve().get("key"));
+ // Next update is not persisted, it should not be recovered
+ schedConfUpdateInfo.getGlobalParams().put("key", "badVal");
+ confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+
+ // Start RM2 and verifies it starts with updated configuration
+ rm2.getRMContext().getRMAdminService().transitionToActive(req);
+ assertEquals("RM with ZKStore didn't start",
+ Service.STATE.STARTED, rm2.getServiceState());
+ assertEquals("RM should be Active",
+ HAServiceProtocol.HAServiceState.ACTIVE,
+ rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
+
+ for (int i = 0; i < 10000 / 50; i++) {
+ if (HAServiceProtocol.HAServiceState.ACTIVE ==
+ rm1.getRMContext().getRMAdminService().getServiceStatus()
+ .getState()) {
+ Thread.sleep(100);
+ }
+ }
+ assertEquals("RM should have been fenced",
+ HAServiceProtocol.HAServiceState.STANDBY,
+ rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
+ assertEquals("RM should be Active",
+ HAServiceProtocol.HAServiceState.ACTIVE,
+ rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
+
+ assertEquals("val", ((MutableCSConfigurationProvider) (
+ (CapacityScheduler) rm2.getResourceScheduler())
+ .getMutableConfProvider()).getConfStore().retrieve().get("key"));
+ assertEquals("val", ((MutableConfScheduler) rm2.getResourceScheduler())
+ .getConfiguration().get("key"));
+ // Transition to standby will set RM's HA status and then reinitialize in
+ // a separate thread. Despite asserting for STANDBY state, it's
+ // possible for reinitialization to be unfinished. Wait here for it to
+ // finish, otherwise closing rm1 will close zkManager and the unfinished
+ // reinitialization will throw an exception.
+ Thread.sleep(10000);
+ rm1.close();
+ rm2.close();
+ }
+
+ /**
+ * When failing over, if RM1 stopped and removed a queue that RM2 has in
+ * memory, failing over to RM2 should not throw an exception.
+ * @throws Exception
+ */
+ @Test
+ public void testFailoverAfterRemoveQueue() throws Exception {
+ HAServiceProtocol.StateChangeRequestInfo req =
+ new HAServiceProtocol.StateChangeRequestInfo(
+ HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+
+ Configuration conf1 = createRMHAConf("rm1,rm2", "rm1", 1234);
+ ResourceManager rm1 = new MockRM(conf1);
+ rm1.start();
+ rm1.getRMContext().getRMAdminService().transitionToActive(req);
+ assertEquals("RM with ZKStore didn't start",
+ Service.STATE.STARTED, rm1.getServiceState());
+ assertEquals("RM should be Active",
+ HAServiceProtocol.HAServiceState.ACTIVE,
+ rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
+
+ Configuration conf2 = createRMHAConf("rm1,rm2", "rm2", 5678);
+ ResourceManager rm2 = new MockRM(conf2);
+ rm2.start();
+ assertEquals("RM should be Standby",
+ HAServiceProtocol.HAServiceState.STANDBY,
+ rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
+
+ UserGroupInformation user = UserGroupInformation
+ .createUserForTesting(TEST_USER, new String[0]);
+ MutableConfigurationProvider confProvider = ((MutableConfScheduler)
+ rm1.getResourceScheduler()).getMutableConfProvider();
+ // Add root.a
+ SchedConfUpdateInfo schedConfUpdateInfo = new SchedConfUpdateInfo();
+ Map addParams = new HashMap<>();
+ addParams.put("capacity", "100");
+ QueueConfigInfo addInfo = new QueueConfigInfo("root.a", addParams);
+ schedConfUpdateInfo.getAddQueueInfo().add(addInfo);
+ // Stop root.default
+ Map stopParams = new HashMap<>();
+ stopParams.put("state", "STOPPED");
+ stopParams.put("capacity", "0");
+ QueueConfigInfo stopInfo = new QueueConfigInfo("root.default", stopParams);
+ schedConfUpdateInfo.getUpdateQueueInfo().add(stopInfo);
+ confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+ rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext());
+ confProvider.confirmPendingMutation(true);
+ assertTrue(Arrays.asList(((MutableConfScheduler) rm1.getResourceScheduler())
+ .getConfiguration().get("yarn.scheduler.capacity.root.queues").split
+ (",")).contains("a"));
+
+ // Remove root.default
+ schedConfUpdateInfo.getUpdateQueueInfo().clear();
+ schedConfUpdateInfo.getAddQueueInfo().clear();
+ schedConfUpdateInfo.getRemoveQueueInfo().add("root.default");
+ confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+ rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext());
+ confProvider.confirmPendingMutation(true);
+ assertEquals("a", ((MutableConfScheduler) rm1.getResourceScheduler())
+ .getConfiguration().get("yarn.scheduler.capacity.root.queues"));
+
+ // Start RM2 and verifies it starts with updated configuration
+ rm2.getRMContext().getRMAdminService().transitionToActive(req);
+ assertEquals("RM with ZKStore didn't start",
+ Service.STATE.STARTED, rm2.getServiceState());
+ assertEquals("RM should be Active",
+ HAServiceProtocol.HAServiceState.ACTIVE,
+ rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
+
+ for (int i = 0; i < 10000 / 50; i++) {
+ if (HAServiceProtocol.HAServiceState.ACTIVE ==
+ rm1.getRMContext().getRMAdminService().getServiceStatus()
+ .getState()) {
+ Thread.sleep(100);
+ }
+ }
+ assertEquals("RM should have been fenced",
+ HAServiceProtocol.HAServiceState.STANDBY,
+ rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
+ assertEquals("RM should be Active",
+ HAServiceProtocol.HAServiceState.ACTIVE,
+ rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
+
+ assertEquals("a", ((MutableCSConfigurationProvider) (
+ (CapacityScheduler) rm2.getResourceScheduler())
+ .getMutableConfProvider()).getConfStore().retrieve()
+ .get("yarn.scheduler.capacity.root.queues"));
+ assertEquals("a", ((MutableConfScheduler) rm2.getResourceScheduler())
+ .getConfiguration().get("yarn.scheduler.capacity.root.queues"));
+ // Transition to standby will set RM's HA status and then reinitialize in
+ // a separate thread. Despite asserting for STANDBY state, it's
+ // possible for reinitialization to be unfinished. Wait here for it to
+ // finish, otherwise closing rm1 will close zkManager and the unfinished
+ // reinitialization will throw an exception.
+ Thread.sleep(10000);
+ rm1.close();
+ rm2.close();
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
index 5d43ebb..1d3df14 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
@@ -18,7 +18,11 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
@@ -30,11 +34,16 @@
import org.junit.Before;
import org.junit.Test;
+import java.io.File;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -99,4 +108,48 @@ public void testInMemoryBackedProvider() throws Exception {
assertNull(confProvider.loadConfiguration(conf).get(
"yarn.scheduler.capacity.root.a.badKey"));
}
+
+ @Test
+ public void testHDFSBackedProvider() throws Exception {
+ File testSchedulerConfigurationDir = new File(
+ TestMutableCSConfigurationProvider.class.getResource("").getPath()
+ + TestMutableCSConfigurationProvider.class.getSimpleName());
+ FileUtils.deleteDirectory(testSchedulerConfigurationDir);
+ testSchedulerConfigurationDir.mkdirs();
+
+ Configuration conf = new Configuration(false);
+ conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
+ YarnConfiguration.HDFS_CONFIGURATION_STORE);
+ conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_HFDS_PATH,
+ testSchedulerConfigurationDir.getAbsolutePath());
+ writeConf(conf, testSchedulerConfigurationDir.getAbsolutePath());
+
+ confProvider.init(conf);
+ assertNull(confProvider.loadConfiguration(conf)
+ .get("yarn.scheduler.capacity.root.a.goodKey"));
+
+ doNothing().when(cs).reinitialize(any(Configuration.class),
+ any(RMContext.class));
+
+ confProvider.logAndApplyMutation(TEST_USER, goodUpdate);
+ confProvider.confirmPendingMutation(true);
+ assertEquals("goodVal", confProvider.loadConfiguration(conf)
+ .get("yarn.scheduler.capacity.root.a.goodKey"));
+
+ confProvider.logAndApplyMutation(TEST_USER, badUpdate);
+ confProvider.confirmPendingMutation(false);
+ assertNull(confProvider.loadConfiguration(conf)
+ .get("yarn.scheduler.capacity.root.a.badKey"));
+
+ }
+
+ private void writeConf(Configuration conf, String storePath) throws IOException {
+ FileSystem fileSystem = FileSystem.get(new Configuration(conf));
+ String schedulerConfigurationFile = YarnConfiguration.CS_CONFIGURATION_FILE
+ + "." + System.currentTimeMillis();
+ FSDataOutputStream outputStream = fileSystem.create(
+ new Path(storePath, schedulerConfigurationFile));
+ conf.writeXml(outputStream);
+ outputStream.close();
+ }
}