diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index bc6d61a..6e9e367 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -806,12 +806,15 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String MEMORY_CONFIGURATION_STORE = "memory";
@Private
@Unstable
- public static final String LEVELDB_CONFIGURATION_STORE = "leveldb";
+ public static final String FS_CONFIGURATION_STORE = "fs";
@Private
@Unstable
public static final String ZK_CONFIGURATION_STORE = "zk";
@Private
@Unstable
+ public static final String LEVELDB_CONFIGURATION_STORE = "leveldb";
+ @Private
+ @Unstable
public static final String DEFAULT_CONFIGURATION_STORE =
FILE_CONFIGURATION_STORE;
@Private
@@ -839,6 +842,17 @@ public static boolean isAclEnabled(Configuration conf) {
@Private
@Unstable
public static final long DEFAULT_RM_SCHEDCONF_ZK_MAX_LOGS = 1000;
+ @Private
+ @Unstable
+ public static final String SCHEDULER_CONFIGURATION_FS_PATH =
+ YARN_PREFIX + "scheduler.configuration.fs.path";
+ @Private
+ @Unstable
+ public static final String SCHEDULER_CONFIGURATION_FS_MAX_VERSION =
+ YARN_PREFIX + "scheduler.configuration.max.version";
+ @Private
+ @Unstable
+ public static final int DEFAULT_SCHEDULER_CONFIGURATION_FS_MAX_VERSION = 100;
/** Parent znode path under which ZKConfigurationStore will create znodes. */
@Private
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index c32dd1f..5dd0f88 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3638,6 +3638,25 @@
+ The filesystem path for store configuration files, use "hdfs://" when
+ store config file on hdfs, and use "file://" when store file on local
+ filesystem.
+
+ yarn.scheduler.configuration.fs.path
+ hdfs:///yarn/scheduler/conf
+
+
+
+
+ The max number of configuration file in filesystem.
+ Default is 100 for either.
+
+ yarn.scheduler.configuration.max.version
+ 100
+
+
+
+
ZK root node path for configuration store when using zookeeper-based
configuration store.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index dbe7680..d0cd0bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -376,6 +376,7 @@ void initScheduler(Configuration configuration) throws
case YarnConfiguration.MEMORY_CONFIGURATION_STORE:
case YarnConfiguration.LEVELDB_CONFIGURATION_STORE:
case YarnConfiguration.ZK_CONFIGURATION_STORE:
+ case YarnConfiguration.FS_CONFIGURATION_STORE:
this.csConfProvider = new MutableCSConfigurationProvider(rmContext);
break;
default:
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java
new file mode 100644
index 0000000..2a24887
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.gson.GsonBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+
+
+/**
+ * A filesystem implementation of {@link YarnConfigurationStore}. Offer
+ * configuration storage in FileSystem
+ */
+public class FSSchedulerConfigurationStore extends YarnConfigurationStore {
+ public static final Log LOG = LogFactory.getLog(
+ FSSchedulerConfigurationStore.class);
+
+ @VisibleForTesting
+ protected static final Version CURRENT_VERSION_INFO
+ = Version.newInstance(0, 1);
+
+ private static final String TMP = ".tmp";
+
+ private int maxVersion;
+ private Path schedulerConfDir;
+ private FileSystem fileSystem;
+ private LogMutation pendingMutation;
+ private PathFilter configFilePathFilter;
+ private volatile Configuration schedConf;
+ private volatile Configuration oldConf;
+ private Path tempConfigPath;
+
+ @Override
+ public void initialize(Configuration conf, Configuration vSchedConf,
+ RMContext rmContext) throws Exception {
+ this.configFilePathFilter = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ if (path == null) {
+ return false;
+ }
+ String pathName = path.getName();
+ return pathName.startsWith(YarnConfiguration.CS_CONFIGURATION_FILE)
+ && !pathName.endsWith(TMP);
+ }
+ };
+
+ String schedulerConfPathStr = conf.get(
+ YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH);
+ if (schedulerConfPathStr == null || schedulerConfPathStr.isEmpty()) {
+ throw new IOException(
+ YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH
+ + " must be set");
+ }
+ this.schedulerConfDir = new Path(schedulerConfPathStr);
+ this.fileSystem = this.schedulerConfDir.getFileSystem(conf);
+ this.maxVersion = conf.getInt(
+ YarnConfiguration.SCHEDULER_CONFIGURATION_FS_MAX_VERSION,
+ YarnConfiguration.DEFAULT_SCHEDULER_CONFIGURATION_FS_MAX_VERSION);
+ LOG.info("schedulerConfDir=" + schedulerConfPathStr);
+ LOG.info("capacity scheduler file max version = " + maxVersion);
+
+ if (!fileSystem.exists(schedulerConfDir)) {
+ if (!fileSystem.mkdirs(schedulerConfDir)) {
+ throw new IOException("mkdir " + schedulerConfPathStr + " failed");
+ }
+ }
+
+ // create capacity-schedule.xml.ts file if not existing
+ if (this.getConfigFileInputStream() == null) {
+ writeConfigurationToFileSystem(vSchedConf);
+ }
+
+ this.schedConf = this.getConfigurationFromFileSystem();
+ }
+
+ /**
+ * Update and persist latest configuration in temp file.
+ * @param logMutation configuration change to be persisted in write ahead log
+ * @throws IOException throw IOE when write temp configuration file fail
+ */
+ @Override
+ public void logMutation(LogMutation logMutation) throws IOException {
+ pendingMutation = logMutation;
+ LOG.info(new GsonBuilder().serializeNulls().create().toJson(logMutation));
+ oldConf = new Configuration(schedConf);
+ Map mutations = pendingMutation.getUpdates();
+ for (Map.Entry kv : mutations.entrySet()) {
+ if (kv.getValue() == null) {
+ this.schedConf.unset(kv.getKey());
+ } else {
+ this.schedConf.set(kv.getKey(), kv.getValue());
+ }
+ }
+ tempConfigPath = writeTmpConfig(schedConf);
+ }
+
+ /**
+ * @param isValid if true, finalize temp configuration file
+ * if false, remove temp configuration file and rollback
+ * @throws Exception throw IOE when write temp configuration file fail
+ */
+ @Override
+ public void confirmMutation(boolean isValid) throws Exception {
+ if (pendingMutation == null || tempConfigPath == null) {
+ LOG.warn("pendingMutation or tempConfigPath is null, do nothing");
+ return;
+ }
+ if (isValid) {
+ finalizeFileSystemFile();
+ } else {
+ schedConf = oldConf;
+ removeTmpConfigFile();
+ }
+ tempConfigPath = null;
+ }
+
+ private void finalizeFileSystemFile() throws IOException {
+ // call confirmMutation() make sure tempConfigPath is not null
+ Path finalConfigPath = getFinalConfigPath(tempConfigPath);
+ fileSystem.rename(tempConfigPath, finalConfigPath);
+ LOG.info("finalize temp configuration file successfully, finalConfigPath="
+ + finalConfigPath);
+ }
+
+ private Path getFinalConfigPath(Path tempPath) {
+ String tempConfigPathStr = tempPath.getName();
+ if (!tempConfigPathStr.endsWith(TMP)) {
+ LOG.warn(tempPath + " does not end with '"
+ + TMP + "' return null");
+ return null;
+ }
+ String finalConfigPathStr = tempConfigPathStr.substring(0,
+ (tempConfigPathStr.length() - TMP.length()));
+ return new Path(tempPath.getParent(), finalConfigPathStr);
+ }
+
+ private void removeTmpConfigFile() throws IOException {
+ // call confirmMutation() make sure tempConfigPath is not null
+ fileSystem.delete(tempConfigPath, true);
+ LOG.info("delete temp configuration file: " + tempConfigPath);
+ }
+
+ private Configuration getConfigurationFromFileSystem() throws IOException {
+ long start = Time.monotonicNow();
+
+ Configuration conf = new Configuration(false);
+ InputStream configInputStream = getConfigFileInputStream();
+ if (configInputStream == null) {
+ throw new IOException(
+ "no capacity scheduler file in " + this.schedulerConfDir);
+ }
+
+ conf.addResource(configInputStream);
+ Configuration result = new Configuration(false);
+ for (Map.Entry entry : conf) {
+ result.set(entry.getKey(), entry.getValue());
+ }
+ LOG.info("upload conf from fileSystem took "
+ + (Time.monotonicNow() - start) + " ms");
+
+ //for ha transition, local schedConf may be old one.
+ this.schedConf = result;
+ return result;
+ }
+
+ private InputStream getConfigFileInputStream() throws IOException {
+ Path lastestConfigPath = getLatestConfigPath();
+ if (lastestConfigPath == null) {
+ return null;
+ }
+ return fileSystem.open(lastestConfigPath);
+ }
+
+ private Path getLatestConfigPath() throws IOException {
+ FileStatus[] fileStatuses = fileSystem.listStatus(this.schedulerConfDir,
+ this.configFilePathFilter);
+
+ if (fileStatuses == null || fileStatuses.length == 0) {
+ return null;
+ }
+ Arrays.sort(fileStatuses);
+
+ return fileStatuses[fileStatuses.length - 1].getPath();
+ }
+
+ @VisibleForTesting
+ private Path writeTmpConfig(Configuration vSchedConf) throws IOException {
+ long start = Time.monotonicNow();
+ String tempSchedulerConfigFile = YarnConfiguration.CS_CONFIGURATION_FILE
+ + "." + System.currentTimeMillis() + TMP;
+
+ Path tempSchedulerConfigPath = new Path(this.schedulerConfDir,
+ tempSchedulerConfigFile);
+
+ try (FSDataOutputStream outputStream = fileSystem.create(
+ tempSchedulerConfigPath)) {
+ //clean configuration file when num exceed maxVersion
+ cleanConfigurationFile();
+
+ vSchedConf.writeXml(outputStream);
+ LOG.info(
+ "write temp capacity configuration successfully, schedulerConfigFile="
+ + tempSchedulerConfigPath);
+ } catch (IOException e) {
+ LOG.info("write temp capacity configuration fail, schedulerConfigFile="
+ + tempSchedulerConfigPath, e);
+ throw e;
+ }
+ LOG.info("write temp configuration to fileSystem took "
+ + (Time.monotonicNow() - start) + " ms");
+ return tempSchedulerConfigPath;
+ }
+
+ @VisibleForTesting
+ void writeConfigurationToFileSystem(Configuration vSchedConf)
+ throws IOException {
+ tempConfigPath = writeTmpConfig(vSchedConf);
+ finalizeFileSystemFile();
+ }
+
+ private void cleanConfigurationFile() throws IOException {
+ FileStatus[] fileStatuses = fileSystem.listStatus(this.schedulerConfDir,
+ this.configFilePathFilter);
+
+ if (fileStatuses == null || fileStatuses.length <= this.maxVersion) {
+ return;
+ }
+ Arrays.sort(fileStatuses);
+ int configFileNum = fileStatuses.length;
+ if (fileStatuses.length > this.maxVersion) {
+ for (int i = 0; i < configFileNum - this.maxVersion; i++) {
+ fileSystem.delete(fileStatuses[i].getPath(), false);
+ LOG.info("delete config file " + fileStatuses[i].getPath());
+ }
+ }
+ }
+
+ @Override
+ public Configuration retrieve() throws IOException {
+ return getConfigurationFromFileSystem();
+ }
+
+ @Override
+ public List getConfirmedConfHistory(long fromId) {
+ // Unimplemented.
+ return null;
+ }
+
+ @Override
+ protected Version getConfStoreVersion() throws Exception {
+ return null;
+ }
+
+ @Override
+ protected void storeVersion() throws Exception {
+
+ }
+
+ @Override
+ protected Version getCurrentVersion() {
+ return CURRENT_VERSION_INFO;
+ }
+
+ public void close() throws IOException {
+ if (fileSystem != null) {
+ fileSystem.close();
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
index 40a19a4..9c3bf9d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
@@ -77,6 +77,9 @@ public void init(Configuration config) throws IOException {
case YarnConfiguration.ZK_CONFIGURATION_STORE:
this.confStore = new ZKConfigurationStore();
break;
+ case YarnConfiguration.FS_CONFIGURATION_STORE:
+ this.confStore = new FSSchedulerConfigurationStore();
+ break;
default:
this.confStore = YarnConfigurationStoreFactory.getStore(config);
break;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
index 7fb52fc..ef0a44b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
@@ -123,7 +123,7 @@ public void close() throws IOException {}
* Retrieve the persisted configuration.
* @return configuration as key-value
*/
- public abstract Configuration retrieve();
+ public abstract Configuration retrieve() throws IOException;
/**
* Get a list of confirmed configuration mutations starting from a given id.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
new file mode 100644
index 0000000..65314be
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests {@link FSSchedulerConfigurationStore}.
+ */
+public class TestFSSchedulerConfigurationStore {
+ private FSSchedulerConfigurationStore configurationStore;
+ private Configuration conf;
+ private File testSchedulerConfigurationDir;
+
+ @Before
+ public void setUp() throws Exception {
+ configurationStore = new FSSchedulerConfigurationStore();
+ testSchedulerConfigurationDir = new File(
+ TestFSSchedulerConfigurationStore.class.getResource("").getPath()
+ + FSSchedulerConfigurationStore.class.getSimpleName());
+ testSchedulerConfigurationDir.mkdirs();
+
+ conf = new Configuration();
+ conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH,
+ testSchedulerConfigurationDir.getAbsolutePath());
+ }
+
+ private void writeConf(Configuration config) throws IOException {
+ FileSystem fileSystem = FileSystem.get(new Configuration(config));
+ String schedulerConfigurationFile = YarnConfiguration.CS_CONFIGURATION_FILE
+ + "." + System.currentTimeMillis();
+ FSDataOutputStream outputStream = fileSystem.create(
+ new Path(testSchedulerConfigurationDir.getAbsolutePath(),
+ schedulerConfigurationFile));
+ config.writeXml(outputStream);
+ outputStream.close();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ FileUtils.deleteDirectory(testSchedulerConfigurationDir);
+ }
+
+ @Test
+ public void confirmMutationWithValid() throws Exception {
+ conf.setInt(
+ YarnConfiguration.SCHEDULER_CONFIGURATION_FS_MAX_VERSION, 2);
+ conf.set("a", "a");
+ conf.set("b", "b");
+ conf.set("c", "c");
+ writeConf(conf);
+ configurationStore.initialize(conf, conf, null);
+ Configuration storeConf = configurationStore.retrieve();
+ compareConfig(conf, storeConf);
+
+ Map updates = new HashMap<>();
+ updates.put("a", null);
+ updates.put("b", "bb");
+
+ Configuration expectConfig = new Configuration(conf);
+ expectConfig.unset("a");
+ expectConfig.set("b", "bb");
+
+ LogMutation logMutation = new LogMutation(updates, "test");
+ configurationStore.logMutation(logMutation);
+ configurationStore.confirmMutation(true);
+ storeConf = configurationStore.retrieve();
+ assertEquals(null, storeConf.get("a"));
+ assertEquals("bb", storeConf.get("b"));
+ assertEquals("c", storeConf.get("c"));
+
+ compareConfig(expectConfig, storeConf);
+
+ updates.put("b", "bbb");
+ configurationStore.logMutation(logMutation);
+ configurationStore.confirmMutation(true);
+ storeConf = configurationStore.retrieve();
+ assertEquals(null, storeConf.get("a"));
+ assertEquals("bbb", storeConf.get("b"));
+ assertEquals("c", storeConf.get("c"));
+ }
+
+ @Test
+ public void confirmMutationWithInValid() throws Exception {
+ conf.set("a", "a");
+ conf.set("b", "b");
+ conf.set("c", "c");
+ writeConf(conf);
+ configurationStore.initialize(conf, conf, null);
+ Configuration storeConf = configurationStore.retrieve();
+ compareConfig(conf, storeConf);
+
+ Map updates = new HashMap<>();
+ updates.put("a", null);
+ updates.put("b", "bb");
+
+ LogMutation logMutation = new LogMutation(updates, "test");
+ configurationStore.logMutation(logMutation);
+ configurationStore.confirmMutation(false);
+ storeConf = configurationStore.retrieve();
+
+ compareConfig(conf, storeConf);
+ }
+
+ @Test
+ public void retrieve() throws Exception {
+ Configuration schedulerConf = new Configuration();
+ schedulerConf.set("a", "a");
+ schedulerConf.setLong("long", 1L);
+ schedulerConf.setBoolean("boolean", true);
+ writeConf(schedulerConf);
+
+ configurationStore.initialize(conf, conf, null);
+ Configuration storedConfig = configurationStore.retrieve();
+
+ compareConfig(schedulerConf, storedConfig);
+ }
+
+ @Test
+ public void checkVersion() {
+ try {
+ configurationStore.checkVersion();
+ } catch (Exception e) {
+ fail("checkVersion throw exception");
+ }
+ }
+
+ private void compareConfig(Configuration schedulerConf,
+ Configuration storedConfig) {
+ for (Map.Entry entry : schedulerConf) {
+ assertEquals(entry.getKey(), schedulerConf.get(entry.getKey()),
+ storedConfig.get(entry.getKey()));
+ }
+
+ for (Map.Entry entry : storedConfig) {
+ assertEquals(entry.getKey(), storedConfig.get(entry.getKey()),
+ schedulerConf.get(entry.getKey()));
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
index 5d43ebb..81bc7a7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
@@ -18,7 +18,11 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
@@ -30,6 +34,8 @@
import org.junit.Before;
import org.junit.Test;
+import java.io.File;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -99,4 +105,48 @@ public void testInMemoryBackedProvider() throws Exception {
assertNull(confProvider.loadConfiguration(conf).get(
"yarn.scheduler.capacity.root.a.badKey"));
}
+
+ @Test
+ public void testHDFSBackedProvider() throws Exception {
+ File testSchedulerConfigurationDir = new File(
+ TestMutableCSConfigurationProvider.class.getResource("").getPath()
+ + TestMutableCSConfigurationProvider.class.getSimpleName());
+ FileUtils.deleteDirectory(testSchedulerConfigurationDir);
+ testSchedulerConfigurationDir.mkdirs();
+
+ Configuration conf = new Configuration(false);
+ conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
+ YarnConfiguration.FS_CONFIGURATION_STORE);
+ conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH,
+ testSchedulerConfigurationDir.getAbsolutePath());
+ writeConf(conf, testSchedulerConfigurationDir.getAbsolutePath());
+
+ confProvider.init(conf);
+ assertNull(confProvider.loadConfiguration(conf)
+ .get("yarn.scheduler.capacity.root.a.goodKey"));
+
+ confProvider.logAndApplyMutation(TEST_USER, goodUpdate);
+ confProvider.confirmPendingMutation(true);
+ assertEquals("goodVal", confProvider.loadConfiguration(conf)
+ .get("yarn.scheduler.capacity.root.a.goodKey"));
+
+ assertNull(confProvider.loadConfiguration(conf).get(
+ "yarn.scheduler.capacity.root.a.badKey"));
+ confProvider.logAndApplyMutation(TEST_USER, badUpdate);
+ confProvider.confirmPendingMutation(false);
+ assertNull(confProvider.loadConfiguration(conf).get(
+ "yarn.scheduler.capacity.root.a.badKey"));
+
+ }
+
+ private void writeConf(Configuration conf, String storePath)
+ throws IOException {
+ FileSystem fileSystem = FileSystem.get(new Configuration(conf));
+ String schedulerConfigurationFile = YarnConfiguration.CS_CONFIGURATION_FILE
+ + "." + System.currentTimeMillis();
+ try (FSDataOutputStream outputStream = fileSystem.create(
+ new Path(storePath, schedulerConfigurationFile))) {
+ conf.writeXml(outputStream);
+ }
+ }
}