diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 4799137..aaa27c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -698,6 +698,9 @@ public static boolean isAclEnabled(Configuration conf) { public static final String LEVELDB_CONFIGURATION_STORE = "leveldb"; @Private @Unstable + public static final String HDFS_CONFIGURATION_STORE = "hdfs"; + @Private + @Unstable public static final String ZK_CONFIGURATION_STORE = "zk"; @Private @Unstable diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 6f630f8..8037a48 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -306,6 +306,7 @@ void initScheduler(Configuration configuration) throws case YarnConfiguration.MEMORY_CONFIGURATION_STORE: case YarnConfiguration.LEVELDB_CONFIGURATION_STORE: case YarnConfiguration.ZK_CONFIGURATION_STORE: + case YarnConfiguration.HDFS_CONFIGURATION_STORE: this.csConfProvider = new MutableCSConfigurationProvider(rmContext); break; default: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/HDFSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/HDFSSchedulerConfigurationStore.java new file mode 100644 index 0000000..8e8bfc1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/HDFSSchedulerConfigurationStore.java @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.gson.GsonBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; + +import static org.apache.hadoop.yarn.conf.YarnConfiguration.YARN_PREFIX; + +/** + * A HDFS extends of {@link YarnConfigurationStore}. Offer + * configuration storage in HDFS + */ +public class HDFSSchedulerConfigurationStore extends YarnConfigurationStore { + public static final Log LOG = LogFactory.getLog(HDFSSchedulerConfigurationStore.class); + + public static final String SCHEDULER_CONFIGURATION_HFDS_PATH = YARN_PREFIX + + "scheduler.configuration.hdfs.path"; + public static final String SCHEDULER_CONFIGURATION_MAX_VERSION = YARN_PREFIX + + "scheduler.configuration.max.version"; + + @VisibleForTesting + protected static final Version CURRENT_VERSION_INFO = Version + .newInstance(0, 1); + + private static final String TMP = ".tmp"; + + private int maxVersion; + private Path schedulerConfDir; + private FileSystem fileSystem; + private LogMutation pendingMutation; + private PathFilter configFilePathFileter; + private volatile Configuration schedConf; + + @Override + public void initialize(Configuration conf, Configuration schedConf, + RMContext rmContext) throws Exception { + this.configFilePathFileter = new PathFilter() { + @Override public boolean accept(Path path) { + String pathName = path.getName(); + return pathName.startsWith( + YarnConfiguration.CS_CONFIGURATION_FILE) && !pathName.endsWith(TMP) ; + } + }; + + String schedulerConfPathStr = conf.get(SCHEDULER_CONFIGURATION_HFDS_PATH); + if (schedulerConfPathStr == null || schedulerConfPathStr.isEmpty()) { + throw new RuntimeException( + SCHEDULER_CONFIGURATION_HFDS_PATH + " must be set"); + } + this.schedulerConfDir = new Path(schedulerConfPathStr); + this.fileSystem = this.schedulerConfDir.getFileSystem(conf); + this.maxVersion = conf.getInt(SCHEDULER_CONFIGURATION_MAX_VERSION, 100); + LOG.info("schedulerConfDir=" + schedulerConfPathStr); + LOG.info("capacity scheduler file max version = " + maxVersion); + + if (!fileSystem.exists(schedulerConfDir)) { + if (!fileSystem.mkdirs(schedulerConfDir)) { + throw new IOException("mkdir " + schedulerConfPathStr + " failed"); + } + } + + // create capacity-schedule.xml.ts file if not existing + if (this.getConfigFileInputStream() == null) { + writeConfigurationToHdfs(schedConf); + } + + this.schedConf = this.getConfigurationFromHdfs(); + } + + // TODO should do log because update may be lost when hdfs can not access or + // resourcemanager stop + @Override + public synchronized void logMutation(LogMutation logMutation) { + pendingMutation = logMutation; + LOG.info(new GsonBuilder().serializeNulls().create().toJson(logMutation)); + } + + @Override + protected Version getConfStoreVersion() throws Exception { + return null; + } + + @Override + protected void storeVersion() throws Exception { + + } + + @Override + protected Version getCurrentVersion() { + return null; + } + + @Override + public void checkVersion() { + // Does nothing. (Version is always compatible since it's in memory) + } + + @Override + public List getConfirmedConfHistory(long fromId) { + return null; // unimplemented + } + + @Override + public synchronized void confirmMutation(boolean isValid) + throws IOException { + if (isValid) { + for (Map.Entry kv : pendingMutation.getUpdates().entrySet()) { + if (kv.getValue() == null) { + this.schedConf.unset(kv.getKey()); + } else { + this.schedConf.set(kv.getKey(), kv.getValue()); + } + } + writeConfigurationToHdfs(schedConf); + } + pendingMutation = null; + } + + @Override + public synchronized Configuration retrieve() throws IOException { + return getConfigurationFromHdfs(); + } + + private Configuration getConfigurationFromHdfs() throws IOException { + long start = Time.monotonicNow(); + + Configuration conf = new Configuration(false); + InputStream configInputStream = getConfigFileInputStream(); + if (configInputStream == null) { + throw new IOException( + "no capacity scheduler file in " + this.schedulerConfDir); + } + + conf.addResource(configInputStream); + Configuration result = new Configuration(false); + for (Map.Entry entry : conf) { + result.set(entry.getKey(), entry.getValue()); + } + LOG.info("upload conf from hdfs took " + (Time.monotonicNow() - start) + " ms"); + this.schedConf = result; + return result; + } + + private InputStream getConfigFileInputStream() throws IOException { + Path lastestConfigPath = getLatestConfigPath(); + if (lastestConfigPath == null) { + return null; + } + return fileSystem.open(lastestConfigPath); + } + + private Path getLatestConfigPath() throws IOException { + FileStatus[] fileStatuses = fileSystem.listStatus(this.schedulerConfDir, + this.configFilePathFileter); + + if (fileStatuses == null || fileStatuses.length == 0) { + return null; + } + Arrays.sort(fileStatuses); + + return fileStatuses[fileStatuses.length - 1].getPath(); + } + + + @VisibleForTesting + void writeConfigurationToHdfs(Configuration schedConf) { + long start = Time.monotonicNow(); + while (true) { + //use different filename when retry + String schedulerConfigFile = YarnConfiguration.CS_CONFIGURATION_FILE + "." + + System.currentTimeMillis(); + String tmpSchedulerConfigFile = YarnConfiguration.CS_CONFIGURATION_FILE + + "." + System.currentTimeMillis() + TMP; + + Path schdulerConfigFilePath = new Path(this.schedulerConfDir, + schedulerConfigFile); + Path tmpSchdulerConfigFilePath = new Path(this.schedulerConfDir, + tmpSchedulerConfigFile); + + try { + //clean configuration file when num exceed maxVersion + cleanConfigurationFile(); + FSDataOutputStream outputStream = fileSystem.create( + tmpSchdulerConfigFilePath); + schedConf.writeXml(outputStream); + outputStream.close(); + fileSystem.rename(tmpSchdulerConfigFilePath, schdulerConfigFilePath); + + LOG.info( + "write capacity configuration successfully, schedulerConfigFile=" + + schedulerConfigFile); + break; + } catch (Exception e) { + LOG.info("write cacpacity configuration fail, schedulerConfigFile=" + + schedulerConfigFile); + try { + Thread.sleep(3000); + } catch (InterruptedException e1) { + Thread.currentThread().interrupt(); + } + } + } + LOG.info("write conf to hdfs took " + (Time.monotonicNow() - start) + " ms"); + } + + private void cleanConfigurationFile() throws IOException { + FileStatus[] fileStatuses = fileSystem.listStatus(this.schedulerConfDir, + this.configFilePathFileter); + + if (fileStatuses == null || fileStatuses.length <= this.maxVersion) { + return; + } + Arrays.sort(fileStatuses); + int configFileNum = fileStatuses.length; + if (fileStatuses.length > this.maxVersion) { + for (int i = 0; i < configFileNum - this.maxVersion; i++) { + fileSystem.delete(fileStatuses[i].getPath(), false); + LOG.info("delete config file " + fileStatuses[i].getPath()); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java index 40a19a4..17e70ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java @@ -77,6 +77,9 @@ public void init(Configuration config) throws IOException { case YarnConfiguration.ZK_CONFIGURATION_STORE: this.confStore = new ZKConfigurationStore(); break; + case YarnConfiguration.HDFS_CONFIGURATION_STORE: + this.confStore = new HDFSSchedulerConfigurationStore(); + break; default: this.confStore = YarnConfigurationStoreFactory.getStore(config); break; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java index 7fb52fc..ef0a44b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java @@ -123,7 +123,7 @@ public void close() throws IOException {} * Retrieve the persisted configuration. * @return configuration as key-value */ - public abstract Configuration retrieve(); + public abstract Configuration retrieve() throws IOException; /** * Get a list of confirmed configuration mutations starting from a given id. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestHDFSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestHDFSSchedulerConfigurationStore.java new file mode 100644 index 0000000..82625fb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestHDFSSchedulerConfigurationStore.java @@ -0,0 +1,408 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.io.FileUtils; +import org.apache.curator.test.TestingServer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .MutableConfigurationProvider; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf + .YarnConfigurationStore.LogMutation; +import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo; +import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class TestHDFSSchedulerConfigurationStore extends ConfigurationStoreBaseTest{ + private TestingServer curatorTestingServer; + private File testSchedulerConfigurationDir; + + @Override + protected YarnConfigurationStore createConfStore() { + return new HDFSSchedulerConfigurationStore(); + } + + public static TestingServer setupCuratorServer() throws Exception { + TestingServer curatorTestingServer = new TestingServer(); + curatorTestingServer.start(); + return curatorTestingServer; + } + + @Before + public void setUp() throws Exception { + super.setUp(); + curatorTestingServer = setupCuratorServer(); + testSchedulerConfigurationDir = new File( + TestHDFSSchedulerConfigurationStore.class.getResource("").getPath() + + HDFSSchedulerConfigurationStore.class.getSimpleName()); + testSchedulerConfigurationDir.mkdirs(); + + conf = new Configuration(); + conf.set(HDFSSchedulerConfigurationStore.SCHEDULER_CONFIGURATION_HFDS_PATH, + testSchedulerConfigurationDir.getAbsolutePath()); + } + + private void writeConf(Configuration conf) throws IOException { + FileSystem fileSystem = FileSystem.get(new Configuration(conf)); + String schedulerConfigurationFile = YarnConfiguration.CS_CONFIGURATION_FILE + + "." + System.currentTimeMillis(); + FSDataOutputStream outputStream = fileSystem.create( + new Path(testSchedulerConfigurationDir.getAbsolutePath(), + schedulerConfigurationFile)); + conf.writeXml(outputStream); + outputStream.close(); + } + + @After + public void tearDown() throws Exception { + FileUtils.deleteDirectory(testSchedulerConfigurationDir); + } + + @Test + public void initialize() + throws Exception { + confStore.initialize(conf, new Configuration(false), null); + confStore.retrieve().writeXml(System.out); + } + + @Test + public void confirmMutationWithValid() throws Exception { + conf.setInt(HDFSSchedulerConfigurationStore.SCHEDULER_CONFIGURATION_MAX_VERSION, 2); + conf.set("a", "a"); + conf.set("b", "b"); + conf.set("c", "c"); + writeConf(conf); + confStore.initialize(conf, conf, null); + + Map updates = new HashMap<>(); + updates.put("a", null); + updates.put("b", "bb"); + + Configuration expectConfig = new Configuration(conf); + expectConfig.unset("a"); + expectConfig.set("b", "bb"); + + LogMutation logMutation = new LogMutation(updates, "test"); + + confStore.logMutation(logMutation); + confStore.confirmMutation(true); + Configuration storeConf = confStore.retrieve(); + assertEquals(null, storeConf.get("a")); + assertEquals("bb", storeConf.get("b")); + assertEquals("c", storeConf.get("c")); + + compareConfig(expectConfig, storeConf); + + updates.put("b", "bbb"); + + confStore.logMutation(logMutation); + confStore.confirmMutation(true); + storeConf = confStore.retrieve(); + assertEquals(null, storeConf.get("a")); + assertEquals("bbb", storeConf.get("b")); + assertEquals("c", storeConf.get("c")); + + + } + + @Test + public void confirmMutationWithInValid() throws Exception { + conf.set("a", "a"); + conf.set("b", "b"); + conf.set("c", "c"); + writeConf(conf); + confStore.initialize(conf, conf, null); + + Map updates = new HashMap<>(); + updates.put("a", null); + updates.put("b", "bb"); + + LogMutation logMutation = new LogMutation(updates, "test"); + + confStore.confirmMutation(false); + Configuration storeConf = confStore.retrieve(); + + compareConfig(conf, storeConf); + } + + @Test + public void retrieve() throws Exception { + Configuration schedulerConf = new Configuration(); + schedulerConf.set("a", "a"); + schedulerConf.setLong("long", 1L); + schedulerConf.setBoolean("boolean", true); + writeConf(schedulerConf); + + confStore.initialize(conf, conf, null); + Configuration storedConfig = confStore.retrieve(); + + compareConfig(schedulerConf, storedConfig); + } + + private void compareConfig(Configuration schedulerConf, + Configuration storedConfig) { + for (Map.Entry entry : schedulerConf) { + assertEquals(entry.getKey(), schedulerConf.get(entry.getKey()), + storedConfig.get(entry.getKey())); + } + + for (Map.Entry entry : storedConfig) { + assertEquals(entry.getKey(), storedConfig.get(entry.getKey()), + schedulerConf.get(entry.getKey())); + } + } + + public Configuration createRMHAConf(String rmIds, String rmId, + int adminPort) { + Configuration conf = new YarnConfiguration(); + this.conf.setClass(YarnConfiguration.RM_SCHEDULER, + CapacityScheduler.class, CapacityScheduler.class); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.set(YarnConfiguration.RM_HA_IDS, rmIds); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS, + YarnConfiguration.HDFS_CONFIGURATION_STORE); + conf.set(HDFSSchedulerConfigurationStore.SCHEDULER_CONFIGURATION_HFDS_PATH, + testSchedulerConfigurationDir.getAbsolutePath()); + conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName()); + conf.set(YarnConfiguration.RM_ZK_ADDRESS, + curatorTestingServer.getConnectString()); + conf.set(YarnConfiguration.RM_HA_ID, rmId); + conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0"); + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + for (String rpcAddress : + YarnConfiguration.getServiceAddressConfKeys(conf)) { + for (String id : HAUtil.getRMHAIds(conf)) { + conf.set(HAUtil.addSuffix(rpcAddress, id), "localhost:0"); + } + } + conf.set(HAUtil.addSuffix(YarnConfiguration.RM_ADMIN_ADDRESS, rmId), + "localhost:" + adminPort); + return conf; + } + + /** + * When failing over, new active RM should read from current state of store, + * including any updates when the new active RM was in standby. + * @throws Exception + */ + @Test + public void testFailoverReadsFromUpdatedStore() throws Exception { + HAServiceProtocol.StateChangeRequestInfo req = + new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + + Configuration conf1 = createRMHAConf("rm1,rm2", "rm1", 1234); + ResourceManager rm1 = new MockRM(conf1); + rm1.start(); + rm1.getRMContext().getRMAdminService().transitionToActive(req); + assertEquals("RM with ZKStore didn't start", + Service.STATE.STARTED, rm1.getServiceState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm1.getRMContext().getRMAdminService().getServiceStatus().getState()); + assertNull(((MutableConfScheduler) rm1.getResourceScheduler()) + .getConfiguration().get("key")); + + Configuration conf2 = createRMHAConf("rm1,rm2", "rm2", 5678); + ResourceManager rm2 = new MockRM(conf2); + rm2.start(); + assertEquals("RM should be Standby", + HAServiceProtocol.HAServiceState.STANDBY, + rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); + + // Update configuration on RM1 + SchedConfUpdateInfo schedConfUpdateInfo = new SchedConfUpdateInfo(); + schedConfUpdateInfo.getGlobalParams().put("key", "val"); + MutableConfigurationProvider confProvider = ((MutableConfScheduler) + rm1.getResourceScheduler()).getMutableConfProvider(); + UserGroupInformation user = UserGroupInformation + .createUserForTesting(TEST_USER, new String[0]); + confProvider.logAndApplyMutation(user, schedConfUpdateInfo); + rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext()); + assertEquals("val", ((MutableConfScheduler) rm1.getResourceScheduler()) + .getConfiguration().get("key")); + confProvider.confirmPendingMutation(true); + assertEquals("val", ((MutableCSConfigurationProvider) confProvider) + .getConfStore().retrieve().get("key")); + // Next update is not persisted, it should not be recovered + schedConfUpdateInfo.getGlobalParams().put("key", "badVal"); + confProvider.logAndApplyMutation(user, schedConfUpdateInfo); + + // Start RM2 and verifies it starts with updated configuration + rm2.getRMContext().getRMAdminService().transitionToActive(req); + assertEquals("RM with ZKStore didn't start", + Service.STATE.STARTED, rm2.getServiceState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); + + for (int i = 0; i < 10000 / 50; i++) { + if (HAServiceProtocol.HAServiceState.ACTIVE == + rm1.getRMContext().getRMAdminService().getServiceStatus() + .getState()) { + Thread.sleep(100); + } + } + assertEquals("RM should have been fenced", + HAServiceProtocol.HAServiceState.STANDBY, + rm1.getRMContext().getRMAdminService().getServiceStatus().getState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); + + assertEquals("val", ((MutableCSConfigurationProvider) ( + (CapacityScheduler) rm2.getResourceScheduler()) + .getMutableConfProvider()).getConfStore().retrieve().get("key")); + assertEquals("val", ((MutableConfScheduler) rm2.getResourceScheduler()) + .getConfiguration().get("key")); + // Transition to standby will set RM's HA status and then reinitialize in + // a separate thread. Despite asserting for STANDBY state, it's + // possible for reinitialization to be unfinished. Wait here for it to + // finish, otherwise closing rm1 will close zkManager and the unfinished + // reinitialization will throw an exception. + Thread.sleep(10000); + rm1.close(); + rm2.close(); + } + + /** + * When failing over, if RM1 stopped and removed a queue that RM2 has in + * memory, failing over to RM2 should not throw an exception. + * @throws Exception + */ + @Test + public void testFailoverAfterRemoveQueue() throws Exception { + HAServiceProtocol.StateChangeRequestInfo req = + new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + + Configuration conf1 = createRMHAConf("rm1,rm2", "rm1", 1234); + ResourceManager rm1 = new MockRM(conf1); + rm1.start(); + rm1.getRMContext().getRMAdminService().transitionToActive(req); + assertEquals("RM with ZKStore didn't start", + Service.STATE.STARTED, rm1.getServiceState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm1.getRMContext().getRMAdminService().getServiceStatus().getState()); + + Configuration conf2 = createRMHAConf("rm1,rm2", "rm2", 5678); + ResourceManager rm2 = new MockRM(conf2); + rm2.start(); + assertEquals("RM should be Standby", + HAServiceProtocol.HAServiceState.STANDBY, + rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); + + UserGroupInformation user = UserGroupInformation + .createUserForTesting(TEST_USER, new String[0]); + MutableConfigurationProvider confProvider = ((MutableConfScheduler) + rm1.getResourceScheduler()).getMutableConfProvider(); + // Add root.a + SchedConfUpdateInfo schedConfUpdateInfo = new SchedConfUpdateInfo(); + Map addParams = new HashMap<>(); + addParams.put("capacity", "100"); + QueueConfigInfo addInfo = new QueueConfigInfo("root.a", addParams); + schedConfUpdateInfo.getAddQueueInfo().add(addInfo); + // Stop root.default + Map stopParams = new HashMap<>(); + stopParams.put("state", "STOPPED"); + stopParams.put("capacity", "0"); + QueueConfigInfo stopInfo = new QueueConfigInfo("root.default", stopParams); + schedConfUpdateInfo.getUpdateQueueInfo().add(stopInfo); + confProvider.logAndApplyMutation(user, schedConfUpdateInfo); + rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext()); + confProvider.confirmPendingMutation(true); + assertTrue(Arrays.asList(((MutableConfScheduler) rm1.getResourceScheduler()) + .getConfiguration().get("yarn.scheduler.capacity.root.queues").split + (",")).contains("a")); + + // Remove root.default + schedConfUpdateInfo.getUpdateQueueInfo().clear(); + schedConfUpdateInfo.getAddQueueInfo().clear(); + schedConfUpdateInfo.getRemoveQueueInfo().add("root.default"); + confProvider.logAndApplyMutation(user, schedConfUpdateInfo); + rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext()); + confProvider.confirmPendingMutation(true); + assertEquals("a", ((MutableConfScheduler) rm1.getResourceScheduler()) + .getConfiguration().get("yarn.scheduler.capacity.root.queues")); + + // Start RM2 and verifies it starts with updated configuration + rm2.getRMContext().getRMAdminService().transitionToActive(req); + assertEquals("RM with ZKStore didn't start", + Service.STATE.STARTED, rm2.getServiceState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); + + for (int i = 0; i < 10000 / 50; i++) { + if (HAServiceProtocol.HAServiceState.ACTIVE == + rm1.getRMContext().getRMAdminService().getServiceStatus() + .getState()) { + Thread.sleep(100); + } + } + assertEquals("RM should have been fenced", + HAServiceProtocol.HAServiceState.STANDBY, + rm1.getRMContext().getRMAdminService().getServiceStatus().getState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); + + assertEquals("a", ((MutableCSConfigurationProvider) ( + (CapacityScheduler) rm2.getResourceScheduler()) + .getMutableConfProvider()).getConfStore().retrieve() + .get("yarn.scheduler.capacity.root.queues")); + assertEquals("a", ((MutableConfScheduler) rm2.getResourceScheduler()) + .getConfiguration().get("yarn.scheduler.capacity.root.queues")); + // Transition to standby will set RM's HA status and then reinitialize in + // a separate thread. Despite asserting for STANDBY state, it's + // possible for reinitialization to be unfinished. Wait here for it to + // finish, otherwise closing rm1 will close zkManager and the unfinished + // reinitialization will throw an exception. + Thread.sleep(10000); + rm1.close(); + rm2.close(); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java index 5d43ebb..014ed46 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java @@ -18,7 +18,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; @@ -30,11 +34,16 @@ import org.junit.Before; import org.junit.Test; +import java.io.File; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -99,4 +108,48 @@ public void testInMemoryBackedProvider() throws Exception { assertNull(confProvider.loadConfiguration(conf).get( "yarn.scheduler.capacity.root.a.badKey")); } + + @Test + public void testHDFSBackedProvider() throws Exception { + File testSchedulerConfigurationDir = new File( + TestMutableCSConfigurationProvider.class.getResource("").getPath() + + TestMutableCSConfigurationProvider.class.getSimpleName()); + FileUtils.deleteDirectory(testSchedulerConfigurationDir); + testSchedulerConfigurationDir.mkdirs(); + + Configuration conf = new Configuration(false); + conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS, + YarnConfiguration.HDFS_CONFIGURATION_STORE); + conf.set(HDFSSchedulerConfigurationStore.SCHEDULER_CONFIGURATION_HFDS_PATH, + testSchedulerConfigurationDir.getAbsolutePath()); + writeConf(conf, testSchedulerConfigurationDir.getAbsolutePath()); + + confProvider.init(conf); + assertNull(confProvider.loadConfiguration(conf) + .get("yarn.scheduler.capacity.root.a.goodKey")); + + doNothing().when(cs).reinitialize(any(Configuration.class), + any(RMContext.class)); + + confProvider.logAndApplyMutation(TEST_USER, goodUpdate); + confProvider.confirmPendingMutation(true); + assertEquals("goodVal", confProvider.loadConfiguration(conf) + .get("yarn.scheduler.capacity.root.a.goodKey")); + + confProvider.logAndApplyMutation(TEST_USER, badUpdate); + confProvider.confirmPendingMutation(false); + assertNull(confProvider.loadConfiguration(conf) + .get("yarn.scheduler.capacity.root.a.badKey")); + + } + + private void writeConf(Configuration conf, String storePath) throws IOException { + FileSystem fileSystem = FileSystem.get(new Configuration(conf)); + String schedulerConfigurationFile = YarnConfiguration.CS_CONFIGURATION_FILE + + "." + System.currentTimeMillis(); + FSDataOutputStream outputStream = fileSystem.create( + new Path(storePath, schedulerConfigurationFile)); + conf.writeXml(outputStream); + outputStream.close(); + } }