diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java index bcdfb5924de..b43bcb839c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java @@ -344,11 +344,15 @@ protected DB getDB() { @Override public void storeVersion() throws Exception { - String key = VERSION_KEY; - byte[] data = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto() + storeVersion(CURRENT_VERSION_INFO); + } + + @VisibleForTesting + protected void storeVersion(Version version) throws Exception { + byte[] data = ((VersionPBImpl) version).getProto() .toByteArray(); try { - db.put(bytes(key), data); + db.put(bytes(VERSION_KEY), data); } catch (DBException e) { throw new IOException(e); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfStoreVersionIncompatibleException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfStoreVersionIncompatibleException.java new file mode 100644 index 00000000000..2213ee24c6e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfStoreVersionIncompatibleException.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * This exception is thrown by {@link YarnConfigurationStore} if it's loading + * an incompatible persisted schema version. + */ +public class YarnConfStoreVersionIncompatibleException extends + YarnException { + private static final long serialVersionUID = -2829858253579013629L; + + public YarnConfStoreVersionIncompatibleException(Throwable cause) { + super(cause); + } + + public YarnConfStoreVersionIncompatibleException(String message) { + super(message); + } + + public YarnConfStoreVersionIncompatibleException( + String message, Throwable cause) { + super(message, cause); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java index 34aa17428c2..9985d6539a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java @@ -23,7 +23,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateVersionIncompatibleException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import java.io.IOException; @@ -103,6 +102,7 @@ public void close() throws IOException {} /** * Logs the configuration change to backing store. + * * @param logMutation configuration change to be persisted in write ahead log * @throws IOException if logging fails */ @@ -169,23 +169,21 @@ public abstract void confirmMutation(LogMutation pendingMutation, protected abstract Version getCurrentVersion(); public void checkVersion() throws Exception { - // TODO this was taken from RMStateStore. Should probably refactor Version loadedVersion = getConfStoreVersion(); - LOG.info("Loaded configuration store version info " + loadedVersion); - if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) { + Version currentVersion = getCurrentVersion(); + LOG.info("Loaded configuration store version info {}", loadedVersion); + + // when currentVersion is null, the checkVersion method is overridden + if (currentVersion.equals(loadedVersion)) { return; } // if there is no version info, treat it as CURRENT_VERSION_INFO; - if (loadedVersion == null) { - loadedVersion = getCurrentVersion(); - } - if (loadedVersion.isCompatibleTo(getCurrentVersion())) { - LOG.info("Storing configuration store version info " - + getCurrentVersion()); + if (loadedVersion == null || currentVersion.isCompatibleTo(loadedVersion)) { + LOG.info("Storing configuration store version info {}", currentVersion); storeVersion(); } else { - throw new RMStateVersionIncompatibleException( - "Expecting configuration store version " + getCurrentVersion() + throw new YarnConfStoreVersionIncompatibleException( + "Expecting configuration store version " + currentVersion + ", but loading version " + loadedVersion); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java index 4b3153a0d5a..3a8c362812c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java @@ -34,16 +34,13 @@ * Base class for {@link YarnConfigurationStore} implementations. */ public abstract class ConfigurationStoreBaseTest { + static final String TEST_USER = "testUser"; + YarnConfigurationStore confStore = createConfStore(); + Configuration conf; + Configuration schedConf; + RMContext rmContext; - protected YarnConfigurationStore confStore = createConfStore(); - - protected abstract YarnConfigurationStore createConfStore(); - - protected Configuration conf; - protected Configuration schedConf; - protected RMContext rmContext; - - protected static final String TEST_USER = "testUser"; + abstract YarnConfigurationStore createConfStore(); @Before public void setUp() throws Exception { @@ -59,20 +56,12 @@ public void testConfigurationUpdate() throws Exception { confStore.initialize(conf, schedConf, rmContext); assertEquals("val1", confStore.retrieve().get("key1")); - Map update1 = new HashMap<>(); - update1.put("keyUpdate1", "valUpdate1"); - YarnConfigurationStore.LogMutation mutation1 = - new YarnConfigurationStore.LogMutation(update1, TEST_USER); - confStore.logMutation(mutation1); - confStore.confirmMutation(mutation1, true); + confStore.confirmMutation(prepareLogMutation("keyUpdate1", "valUpdate1"), + true); assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1")); - Map update2 = new HashMap<>(); - update2.put("keyUpdate2", "valUpdate2"); - YarnConfigurationStore.LogMutation mutation2 = - new YarnConfigurationStore.LogMutation(update2, TEST_USER); - confStore.logMutation(mutation2); - confStore.confirmMutation(mutation2, false); + confStore.confirmMutation(prepareLogMutation("keyUpdate2", "valUpdate2"), + false); assertNull("Configuration should not be updated", confStore.retrieve().get("keyUpdate2")); confStore.close(); @@ -84,13 +73,20 @@ public void testNullConfigurationUpdate() throws Exception { confStore.initialize(conf, schedConf, rmContext); assertEquals("val", confStore.retrieve().get("key")); + confStore.confirmMutation(prepareLogMutation("key", null), true); + assertNull(confStore.retrieve().get("key")); + confStore.close(); + } + + YarnConfigurationStore.LogMutation prepareLogMutation(String key, + String value) + throws Exception { Map update = new HashMap<>(); - update.put("key", null); + update.put(key, value); YarnConfigurationStore.LogMutation mutation = new YarnConfigurationStore.LogMutation(update, TEST_USER); confStore.logMutation(mutation); - confStore.confirmMutation(mutation, true); - assertNull(confStore.retrieve().get("key")); - confStore.close(); + + return mutation; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/PersistentConfigurationStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/PersistentConfigurationStoreBaseTest.java new file mode 100644 index 00000000000..1a472692747 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/PersistentConfigurationStoreBaseTest.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.records.Version; +import org.junit.Test; + +import java.util.LinkedList; +import java.util.function.Supplier; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public abstract class PersistentConfigurationStoreBaseTest extends + ConfigurationStoreBaseTest { + + @Test + public void testGetConfigurationVersion() throws Exception { + confStore.initialize(conf, schedConf, rmContext); + long v1 = confStore.getConfigVersion(); + assertEquals(1, v1); + confStore.confirmMutation(prepareLogMutation("keyver", "valver"), true); + long v2 = confStore.getConfigVersion(); + assertEquals(2, v2); + confStore.close(); + } + + @Test + public void testPersistConfiguration() throws Exception { + schedConf.set("key", "val"); + confStore.initialize(conf, schedConf, rmContext); + assertEquals("val", confStore.retrieve().get("key")); + confStore.close(); + + // Create a new configuration store, and check for old configuration + confStore = createConfStore(); + schedConf.set("key", "badVal"); + // Should ignore passed-in scheduler configuration. + confStore.initialize(conf, schedConf, rmContext); + assertEquals("val", confStore.retrieve().get("key")); + confStore.close(); + } + + @Test + public void testPersistUpdatedConfiguration() throws Exception { + confStore.initialize(conf, schedConf, rmContext); + assertNull(confStore.retrieve().get("key")); + + confStore.confirmMutation(prepareLogMutation("key", "val"), true); + assertEquals("val", confStore.retrieve().get("key")); + confStore.close(); + + // Create a new configuration store, and check for updated configuration + confStore = createConfStore(); + schedConf.set("key", "badVal"); + // Should ignore passed-in scheduler configuration. + confStore.initialize(conf, schedConf, rmContext); + assertEquals("val", confStore.retrieve().get("key")); + confStore.close(); + } + + void testVersionHelper(Version expectedVersion) throws Exception { + confStore.initialize(conf, schedConf, rmContext); + assertNull(confStore.getConfStoreVersion()); + confStore.checkVersion(); + assertEquals(expectedVersion, + confStore.getConfStoreVersion()); + confStore.close(); + } + + void testMaxLogsHelper( + Supplier> logSupplier) + throws Exception { + conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2); + confStore.initialize(conf, schedConf, rmContext); + LinkedList logs = logSupplier.get(); + assertEquals(0, logs.size()); + + YarnConfigurationStore.LogMutation mutation = + prepareLogMutation("key1", "val1"); + logs = logSupplier.get(); + assertEquals(1, logs.size()); + assertEquals("val1", logs.get(0).getUpdates().get("key1")); + confStore.confirmMutation(mutation, true); + assertEquals(1, logs.size()); + assertEquals("val1", logs.get(0).getUpdates().get("key1")); + + mutation = prepareLogMutation("key2", "val2"); + logs = logSupplier.get(); + assertEquals(2, logs.size()); + assertEquals("val1", logs.get(0).getUpdates().get("key1")); + assertEquals("val2", logs.get(1).getUpdates().get("key2")); + confStore.confirmMutation(mutation, true); + assertEquals(2, logs.size()); + assertEquals("val1", logs.get(0).getUpdates().get("key1")); + assertEquals("val2", logs.get(1).getUpdates().get("key2")); + + // Next update should purge first update from logs. + mutation = prepareLogMutation("key3", "val3"); + logs = logSupplier.get(); + assertEquals(2, logs.size()); + assertEquals("val2", logs.get(0).getUpdates().get("key2")); + assertEquals("val3", logs.get(1).getUpdates().get("key3")); + confStore.confirmMutation(mutation, true); + assertEquals(2, logs.size()); + assertEquals("val2", logs.get(0).getUpdates().get("key2")); + assertEquals("val3", logs.get(1).getUpdates().get("key3")); + } + + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java index e4ca3d3ada7..4a14c8cfa9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java @@ -20,7 +20,6 @@ import java.io.File; import java.io.IOException; -import java.util.HashMap; import java.util.Map; import org.apache.commons.io.FileUtils; @@ -31,33 +30,31 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; +import org.hamcrest.CoreMatchers; import org.junit.After; import org.junit.Before; import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertThat; /** * Tests {@link FSSchedulerConfigurationStore}. */ -public class TestFSSchedulerConfigurationStore { - private FSSchedulerConfigurationStore configurationStore; - private Configuration conf; +public class TestFSSchedulerConfigurationStore extends + PersistentConfigurationStoreBaseTest { private File testSchedulerConfigurationDir; @Before public void setUp() throws Exception { - configurationStore = new FSSchedulerConfigurationStore(); + super.setUp(); testSchedulerConfigurationDir = new File( TestFSSchedulerConfigurationStore.class.getResource("").getPath() + FSSchedulerConfigurationStore.class.getSimpleName()); testSchedulerConfigurationDir.mkdirs(); - conf = new Configuration(); conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH, testSchedulerConfigurationDir.getAbsolutePath()); } @@ -79,70 +76,51 @@ public void tearDown() throws Exception { } @Test - public void confirmMutationWithValid() throws Exception { - conf.setInt( - YarnConfiguration.SCHEDULER_CONFIGURATION_FS_MAX_VERSION, 2); - conf.set("a", "a"); - conf.set("b", "b"); - conf.set("c", "c"); - writeConf(conf); - configurationStore.initialize(conf, conf, null); - Configuration storeConf = configurationStore.retrieve(); - compareConfig(conf, storeConf); - - Map updates = new HashMap<>(); - updates.put("a", null); - updates.put("b", "bb"); - - Configuration expectConfig = new Configuration(conf); - expectConfig.unset("a"); - expectConfig.set("b", "bb"); - - LogMutation logMutation = new LogMutation(updates, "test"); - configurationStore.logMutation(logMutation); - configurationStore.confirmMutation(logMutation, true); - storeConf = configurationStore.retrieve(); - assertEquals(null, storeConf.get("a")); - assertEquals("bb", storeConf.get("b")); - assertEquals("c", storeConf.get("c")); - - compareConfig(expectConfig, storeConf); - - updates.put("b", "bbb"); - configurationStore.logMutation(logMutation); - configurationStore.confirmMutation(logMutation, true); - storeConf = configurationStore.retrieve(); - assertEquals(null, storeConf.get("a")); - assertEquals("bbb", storeConf.get("b")); - assertEquals("c", storeConf.get("c")); + public void checkVersion() { + try { + confStore.checkVersion(); + } catch (Exception e) { + fail("checkVersion throw exception"); + } + } + + @Test + public void testConfigRetrieval() throws Exception { + Configuration schedulerConf = new Configuration(); + schedulerConf.set("a", "a"); + schedulerConf.setLong("long", 1L); + schedulerConf.setBoolean("boolean", true); + writeConf(schedulerConf); + + confStore.initialize(conf, conf, null); + Configuration storedConfig = confStore.retrieve(); + + compareConfig(schedulerConf, storedConfig); } @Test - public void confirmMutationWithInValid() throws Exception { - conf.set("a", "a"); - conf.set("b", "b"); - conf.set("c", "c"); - writeConf(conf); - configurationStore.initialize(conf, conf, null); - Configuration storeConf = configurationStore.retrieve(); - compareConfig(conf, storeConf); - - Map updates = new HashMap<>(); - updates.put("a", null); - updates.put("b", "bb"); - - LogMutation logMutation = new LogMutation(updates, "test"); - configurationStore.logMutation(logMutation); - configurationStore.confirmMutation(logMutation, false); - storeConf = configurationStore.retrieve(); - - compareConfig(conf, storeConf); + public void testFormatConfiguration() throws Exception { + Configuration persistedSchedConf = new Configuration(); + persistedSchedConf.set("a", "a"); + writeConf(persistedSchedConf); + confStore.initialize(conf, conf, null); + Configuration storedConfig = confStore.retrieve(); + assertEquals("a", storedConfig.get("a")); + confStore.format(); + try { + confStore.retrieve(); + fail("Expected an IOException with message containing \"no capacity " + + "scheduler file in\" to be thrown"); + } catch (IOException e) { + assertThat(e.getMessage(), + CoreMatchers.containsString("no capacity scheduler file in")); + } } @Test public void testFileSystemClose() throws Exception { MiniDFSCluster hdfsCluster = null; - FileSystem fs = null; + FileSystem fs; Path path = new Path("/tmp/confstore"); try { HdfsConfiguration hdfsConfig = new HdfsConfiguration(); @@ -154,21 +132,15 @@ public void testFileSystemClose() throws Exception { fs.mkdirs(path); } - FSSchedulerConfigurationStore configStore = - new FSSchedulerConfigurationStore(); hdfsConfig.set(YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH, path.toString()); - configStore.initialize(hdfsConfig, hdfsConfig, null); + confStore.initialize(hdfsConfig, hdfsConfig, null); // Close the FileSystem object and validate fs.close(); try { - Map updates = new HashMap<>(); - updates.put("testkey", "testvalue"); - LogMutation logMutation = new LogMutation(updates, "test"); - configStore.logMutation(logMutation); - configStore.confirmMutation(logMutation, true); + confStore.confirmMutation(prepareLogMutation("key", "val"), true); } catch (IOException e) { if (e.getMessage().contains("Filesystem closed")) { fail("FSSchedulerConfigurationStore failed to handle " + @@ -178,61 +150,17 @@ public void testFileSystemClose() throws Exception { } } } finally { + assert hdfsCluster != null; fs = hdfsCluster.getFileSystem(); if (fs.exists(path)) { fs.delete(path, true); } - if (hdfsCluster != null) { - hdfsCluster.shutdown(); - } - } - } - - @Test - public void testFormatConfiguration() throws Exception { - Configuration schedulerConf = new Configuration(); - schedulerConf.set("a", "a"); - writeConf(schedulerConf); - configurationStore.initialize(conf, conf, null); - Configuration storedConfig = configurationStore.retrieve(); - assertEquals("a", storedConfig.get("a")); - configurationStore.format(); - boolean exceptionCaught = false; - try { - storedConfig = configurationStore.retrieve(); - } catch (IOException e) { - if (e.getMessage().contains("no capacity scheduler file in")) { - exceptionCaught = true; - } - } - assertTrue(exceptionCaught); - } - - @Test - public void retrieve() throws Exception { - Configuration schedulerConf = new Configuration(); - schedulerConf.set("a", "a"); - schedulerConf.setLong("long", 1L); - schedulerConf.setBoolean("boolean", true); - writeConf(schedulerConf); - - configurationStore.initialize(conf, conf, null); - Configuration storedConfig = configurationStore.retrieve(); - - compareConfig(schedulerConf, storedConfig); - } - - @Test - public void checkVersion() { - try { - configurationStore.checkVersion(); - } catch (Exception e) { - fail("checkVersion throw exception"); + hdfsCluster.shutdown(); } } private void compareConfig(Configuration schedulerConf, - Configuration storedConfig) { + Configuration storedConfig) { for (Map.Entry entry : schedulerConf) { assertEquals(entry.getKey(), schedulerConf.get(entry.getKey()), storedConfig.get(entry.getKey())); @@ -243,4 +171,9 @@ private void compareConfig(Configuration schedulerConf, schedulerConf.get(entry.getKey())); } } + + @Override + public YarnConfigurationStore createConfStore() { + return new FSSchedulerConfigurationStore(); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java index c40d16a27cb..7f4d397eb12 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java @@ -18,6 +18,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; +import org.junit.Test; + +import static org.junit.Assert.fail; + /** * Tests {@link InMemoryConfigurationStore}. */ @@ -27,4 +31,13 @@ protected YarnConfigurationStore createConfStore() { return new InMemoryConfigurationStore(); } + + @Test + public void checkVersion() { + try { + confStore.checkVersion(); + } catch (Exception e) { + fail("checkVersion throw exception"); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java index 0ae7624d78c..77cf7a59b53 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; +import org.apache.hadoop.yarn.server.records.Version; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileUtil; @@ -37,18 +38,20 @@ import java.io.File; import java.nio.charset.StandardCharsets; -import java.util.HashMap; import java.util.LinkedList; import java.util.Map; +import java.util.function.Supplier; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; /** * Tests {@link LeveldbConfigurationStore}. */ -public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest { +public class TestLeveldbConfigurationStore extends + PersistentConfigurationStoreBaseTest { public static final Logger LOG = LoggerFactory.getLogger(TestLeveldbConfigurationStore.class); @@ -57,8 +60,6 @@ System.getProperty("java.io.tmpdir")), TestLeveldbConfigurationStore.class.getName()); - private ResourceManager rm; - @Before public void setUp() throws Exception { super.setUp(); @@ -69,52 +70,23 @@ public void setUp() throws Exception { } @Test - public void testVersioning() throws Exception { - confStore.initialize(conf, schedConf, rmContext); - assertNull(confStore.getConfStoreVersion()); - confStore.checkVersion(); - assertEquals(LeveldbConfigurationStore.CURRENT_VERSION_INFO, - confStore.getConfStoreVersion()); - confStore.close(); + public void testVersion() throws Exception { + testVersionHelper(LeveldbConfigurationStore.CURRENT_VERSION_INFO); } - @Test - public void testPersistConfiguration() throws Exception { - schedConf.set("key", "val"); - confStore.initialize(conf, schedConf, rmContext); - assertEquals("val", confStore.retrieve().get("key")); - confStore.close(); - - // Create a new configuration store, and check for old configuration - confStore = createConfStore(); - schedConf.set("key", "badVal"); - // Should ignore passed-in scheduler configuration. - confStore.initialize(conf, schedConf, rmContext); - assertEquals("val", confStore.retrieve().get("key")); - confStore.close(); - } + @Test(expected = YarnConfStoreVersionIncompatibleException.class) + public void testIncompatibleVersion() throws Exception { + try { + confStore.initialize(conf, schedConf, rmContext); - @Test - public void testPersistUpdatedConfiguration() throws Exception { - confStore.initialize(conf, schedConf, rmContext); - assertNull(confStore.retrieve().get("key")); - - Map update = new HashMap<>(); - update.put("key", "val"); - YarnConfigurationStore.LogMutation mutation = - new YarnConfigurationStore.LogMutation(update, TEST_USER); - confStore.logMutation(mutation); - confStore.confirmMutation(mutation, true); - assertEquals("val", confStore.retrieve().get("key")); - confStore.close(); + Version otherVersion = Version.newInstance(1, 1); + ((LeveldbConfigurationStore) confStore).storeVersion(otherVersion); - // Create a new configuration store, and check for updated configuration - confStore = createConfStore(); - schedConf.set("key", "badVal"); - // Should ignore passed-in scheduler configuration. - confStore.initialize(conf, schedConf, rmContext); - assertEquals("val", confStore.retrieve().get("key")); - confStore.close(); + assertEquals(otherVersion, confStore.getConfStoreVersion()); + confStore.checkVersion(); + } finally { + confStore.close(); + } } @Test @@ -122,11 +94,7 @@ public void testDisableAuditLogs() throws Exception { conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 0); confStore.initialize(conf, schedConf, rmContext); - Map update = new HashMap<>(); - update.put("key1", "val1"); - YarnConfigurationStore.LogMutation mutation = - new YarnConfigurationStore.LogMutation(update, TEST_USER); - confStore.logMutation(mutation); + prepareLogMutation("key1", "val1"); boolean logKeyPresent = false; DB db = ((LeveldbConfigurationStore) confStore).getDB(); @@ -146,50 +114,15 @@ public void testDisableAuditLogs() throws Exception { @Test public void testMaxLogs() throws Exception { - conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2); - confStore.initialize(conf, schedConf, rmContext); - LinkedList logs = - ((LeveldbConfigurationStore) confStore).getLogs(); - assertEquals(0, logs.size()); - - Map update1 = new HashMap<>(); - update1.put("key1", "val1"); - YarnConfigurationStore.LogMutation mutation = - new YarnConfigurationStore.LogMutation(update1, TEST_USER); - confStore.logMutation(mutation); - logs = ((LeveldbConfigurationStore) confStore).getLogs(); - assertEquals(1, logs.size()); - assertEquals("val1", logs.get(0).getUpdates().get("key1")); - confStore.confirmMutation(mutation, true); - assertEquals(1, logs.size()); - assertEquals("val1", logs.get(0).getUpdates().get("key1")); - - Map update2 = new HashMap<>(); - update2.put("key2", "val2"); - mutation = new YarnConfigurationStore.LogMutation(update2, TEST_USER); - confStore.logMutation(mutation); - logs = ((LeveldbConfigurationStore) confStore).getLogs(); - assertEquals(2, logs.size()); - assertEquals("val1", logs.get(0).getUpdates().get("key1")); - assertEquals("val2", logs.get(1).getUpdates().get("key2")); - confStore.confirmMutation(mutation, true); - assertEquals(2, logs.size()); - assertEquals("val1", logs.get(0).getUpdates().get("key1")); - assertEquals("val2", logs.get(1).getUpdates().get("key2")); - - // Next update should purge first update from logs. - Map update3 = new HashMap<>(); - update3.put("key3", "val3"); - mutation = new YarnConfigurationStore.LogMutation(update3, TEST_USER); - confStore.logMutation(mutation); - logs = ((LeveldbConfigurationStore) confStore).getLogs(); - assertEquals(2, logs.size()); - assertEquals("val2", logs.get(0).getUpdates().get("key2")); - assertEquals("val3", logs.get(1).getUpdates().get("key3")); - confStore.confirmMutation(mutation, true); - assertEquals(2, logs.size()); - assertEquals("val2", logs.get(0).getUpdates().get("key2")); - assertEquals("val3", logs.get(1).getUpdates().get("key3")); + Supplier> logSupplier = () -> { + try { + return ((LeveldbConfigurationStore) confStore).getLogs(); + } catch (Exception e) { + fail("getLogs should not throw exception"); + return new LinkedList<>(); + } + }; + testMaxLogsHelper(logSupplier); confStore.close(); } @@ -222,7 +155,7 @@ public void testRestartReadsFromUpdatedStore() throws Exception { .getConfStore().retrieve().get("key")); // Next update is not persisted, it should not be recovered schedConfUpdateInfo.getGlobalParams().put("key", "badVal"); - log = confProvider.logAndApplyMutation(user, schedConfUpdateInfo); + confProvider.logAndApplyMutation(user, schedConfUpdateInfo); rm1.close(); // Start RM2 and verifies it starts with updated configuration diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java index e67e382f71e..4ff76c13671 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java @@ -18,9 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; -import org.apache.hadoop.util.curator.ZKCuratorManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; @@ -30,8 +27,11 @@ import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service; +import org.apache.hadoop.util.curator.ZKCuratorManager; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore; @@ -41,25 +41,32 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo; import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.ACL; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.function.Supplier; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Tests {@link ZKConfigurationStore}. */ -public class TestZKConfigurationStore extends ConfigurationStoreBaseTest { - +public class TestZKConfigurationStore extends + PersistentConfigurationStoreBaseTest { public static final Logger LOG = LoggerFactory.getLogger(TestZKConfigurationStore.class); @@ -85,6 +92,7 @@ public static CuratorFramework setupCuratorFramework( } @Before + @Override public void setUp() throws Exception { super.setUp(); curatorTestingServer = setupCuratorServer(); @@ -105,31 +113,32 @@ public void cleanup() throws IOException { } @Test - public void testVersioning() throws Exception { - confStore.initialize(conf, schedConf, rmContext); - assertNull(confStore.getConfStoreVersion()); - confStore.checkVersion(); - assertEquals(ZKConfigurationStore.CURRENT_VERSION_INFO, - confStore.getConfStoreVersion()); + public void testVersion() throws Exception { + testVersionHelper(ZKConfigurationStore.CURRENT_VERSION_INFO); } - @Test - public void testPersistConfiguration() throws Exception { - schedConf.set("key", "val"); + @Test(expected = YarnConfStoreVersionIncompatibleException.class) + public void testIncompatibleVersion() throws Exception { confStore.initialize(conf, schedConf, rmContext); - assertEquals("val", confStore.retrieve().get("key")); - - assertNull(confStore.retrieve().get(YarnConfiguration.RM_HOSTNAME)); - // Create a new configuration store, and check for old configuration - confStore = createConfStore(); - schedConf.set("key", "badVal"); - // Should ignore passed-in scheduler configuration. - confStore.initialize(conf, schedConf, rmContext); - assertEquals("val", confStore.retrieve().get("key")); + Version otherVersion = Version.newInstance(1, 1); + String znodeParentPath = conf.get(YarnConfiguration. + RM_SCHEDCONF_STORE_ZK_PARENT_PATH, + YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH); + String zkVersionPath = ZKCuratorManager.getNodePath(znodeParentPath, + "VERSION"); + String fencingNodePath = ZKCuratorManager.getNodePath(znodeParentPath, + "FENCING"); + byte[] versionData = + ((VersionPBImpl) otherVersion).getProto().toByteArray(); + List zkAcl = ZKCuratorManager.getZKAcls(conf); + ((ZKConfigurationStore) confStore).zkManager.safeCreate(zkVersionPath, + versionData, zkAcl, CreateMode.PERSISTENT, zkAcl, fencingNodePath); + + assertEquals(otherVersion, confStore.getConfStoreVersion()); + confStore.checkVersion(); } - @Test public void testFormatConfiguration() throws Exception { schedConf.set("key", "val"); @@ -139,90 +148,6 @@ public void testFormatConfiguration() throws Exception { assertNull(confStore.retrieve()); } - @Test - public void testGetConfigurationVersion() throws Exception { - confStore.initialize(conf, schedConf, rmContext); - long v1 = confStore.getConfigVersion(); - assertEquals(1, v1); - Map update = new HashMap<>(); - update.put("keyver", "valver"); - YarnConfigurationStore.LogMutation mutation = - new YarnConfigurationStore.LogMutation(update, TEST_USER); - confStore.logMutation(mutation); - confStore.confirmMutation(mutation, true); - long v2 = confStore.getConfigVersion(); - assertEquals(2, v2); - } - - @Test - public void testPersistUpdatedConfiguration() throws Exception { - confStore.initialize(conf, schedConf, rmContext); - assertNull(confStore.retrieve().get("key")); - - Map update = new HashMap<>(); - update.put("key", "val"); - LogMutation mutation = new LogMutation(update, TEST_USER); - confStore.logMutation(mutation); - confStore.confirmMutation(mutation, true); - assertEquals("val", confStore.retrieve().get("key")); - - // Create a new configuration store, and check for updated configuration - confStore = createConfStore(); - schedConf.set("key", "badVal"); - // Should ignore passed-in scheduler configuration. - confStore.initialize(conf, schedConf, rmContext); - assertEquals("val", confStore.retrieve().get("key")); - } - - @Test - public void testMaxLogs() throws Exception { - conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2); - confStore.initialize(conf, schedConf, rmContext); - LinkedList logs = - ((ZKConfigurationStore) confStore).getLogs(); - assertEquals(0, logs.size()); - - Map update1 = new HashMap<>(); - update1.put("key1", "val1"); - YarnConfigurationStore.LogMutation mutation = - new YarnConfigurationStore.LogMutation(update1, TEST_USER); - confStore.logMutation(mutation); - logs = ((ZKConfigurationStore) confStore).getLogs(); - assertEquals(1, logs.size()); - assertEquals("val1", logs.get(0).getUpdates().get("key1")); - confStore.confirmMutation(mutation, true); - assertEquals(1, logs.size()); - assertEquals("val1", logs.get(0).getUpdates().get("key1")); - - Map update2 = new HashMap<>(); - update2.put("key2", "val2"); - mutation = new YarnConfigurationStore.LogMutation(update2, TEST_USER); - confStore.logMutation(mutation); - logs = ((ZKConfigurationStore) confStore).getLogs(); - assertEquals(2, logs.size()); - assertEquals("val1", logs.get(0).getUpdates().get("key1")); - assertEquals("val2", logs.get(1).getUpdates().get("key2")); - confStore.confirmMutation(mutation, true); - assertEquals(2, logs.size()); - assertEquals("val1", logs.get(0).getUpdates().get("key1")); - assertEquals("val2", logs.get(1).getUpdates().get("key2")); - - // Next update should purge first update from logs. - Map update3 = new HashMap<>(); - update3.put("key3", "val3"); - mutation = new YarnConfigurationStore.LogMutation(update3, TEST_USER); - confStore.logMutation(mutation); - logs = ((ZKConfigurationStore) confStore).getLogs(); - assertEquals(2, logs.size()); - assertEquals("val2", logs.get(0).getUpdates().get("key2")); - assertEquals("val3", logs.get(1).getUpdates().get("key3")); - confStore.confirmMutation(mutation, true); - assertEquals(2, logs.size()); - assertEquals("val2", logs.get(0).getUpdates().get("key2")); - assertEquals("val3", logs.get(1).getUpdates().get("key3")); - } - - @Test public void testDisableAuditLogs() throws Exception { conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 0); @@ -234,16 +159,25 @@ public void testDisableAuditLogs() throws Exception { byte[] data = null; ((ZKConfigurationStore) confStore).zkManager.setData(logsPath, data, -1); - Map update = new HashMap<>(); - update.put("key1", "val1"); - YarnConfigurationStore.LogMutation mutation = - new YarnConfigurationStore.LogMutation(update, TEST_USER); - confStore.logMutation(mutation); + prepareLogMutation("key1", "val1"); data = ((ZKConfigurationStore) confStore).zkManager.getData(logsPath); assertNull("Failed to Disable Audit Logs", data); } + @Test + public void testMaxLogs() throws Exception { + Supplier> logSupplier = () -> { + try { + return ((ZKConfigurationStore) confStore).getLogs(); + } catch (Exception e) { + fail("getLogs should not throw an Exception"); + return new LinkedList<>(); + } + }; + testMaxLogsHelper(logSupplier); + } + public Configuration createRMHAConf(String rmIds, String rmId, int adminPort) { Configuration conf = new YarnConfiguration();