diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java
index eeb38d38db1..74694318b5e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java
@@ -271,8 +271,6 @@ public long getConfigVersion() throws Exception {
}
}
-
-
@VisibleForTesting
private Path writeTmpConfig(Configuration vSchedConf) throws IOException {
long start = Time.monotonicNow();
@@ -337,12 +335,12 @@ public Configuration retrieve() throws IOException {
}
@Override
- protected Version getConfStoreVersion() throws Exception {
+ protected Version getConfStoreVersion() {
return null;
}
@Override
- protected void storeVersion() throws Exception {
+ protected void storeVersion() {
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
index d031ea984ff..8e790dd69bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
@@ -87,13 +87,13 @@ public long getConfigVersion() {
}
@Override
- public Version getConfStoreVersion() throws Exception {
+ public Version getConfStoreVersion() {
// Does nothing.
return null;
}
@Override
- public void storeVersion() throws Exception {
+ public void storeVersion() {
// Does nothing.
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
index bcdfb5924de..b43bcb839c0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
@@ -344,11 +344,15 @@ protected DB getDB() {
@Override
public void storeVersion() throws Exception {
- String key = VERSION_KEY;
- byte[] data = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto()
+ storeVersion(CURRENT_VERSION_INFO);
+ }
+
+ @VisibleForTesting
+ protected void storeVersion(Version version) throws Exception {
+ byte[] data = ((VersionPBImpl) version).getProto()
.toByteArray();
try {
- db.put(bytes(key), data);
+ db.put(bytes(VERSION_KEY), data);
} catch (DBException e) {
throw new IOException(e);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfStoreVersionIncompatibleException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfStoreVersionIncompatibleException.java
new file mode 100644
index 00000000000..b0f03c9a8a6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfStoreVersionIncompatibleException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+public class YarnConfStoreVersionIncompatibleException extends
+ YarnException {
+ private static final long serialVersionUID = -2829858253579013629L;
+
+ public YarnConfStoreVersionIncompatibleException(Throwable cause) {
+ super(cause);
+ }
+
+ public YarnConfStoreVersionIncompatibleException(String message) {
+ super(message);
+ }
+
+ public YarnConfStoreVersionIncompatibleException(
+ String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
index 34aa17428c2..437190d3d28 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,13 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateVersionIncompatibleException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
@@ -38,7 +37,7 @@
* should first log it with {@code logMutation}, which persists this pending
* mutation. This mutation is merged to the persisted configuration only after
* {@code confirmMutation} is called.
- *
+ *
* On startup/recovery, caller should call {@code retrieve} to get all
* confirmed mutations, then get pending mutations which were not confirmed via
* {@code getPendingMutations}, and replay/confirm them via
@@ -48,61 +47,31 @@
public static final Logger LOG =
LoggerFactory.getLogger(YarnConfigurationStore.class);
- /**
- * LogMutation encapsulates the fields needed for configuration mutation
- * audit logging and recovery.
- */
- public static class LogMutation implements Serializable {
- private Map updates;
- private String user;
-
- /**
- * Create log mutation.
- * @param updates key-value configuration updates
- * @param user user who requested configuration change
- */
- LogMutation(Map updates, String user) {
- this.updates = updates;
- this.user = user;
- }
-
- /**
- * Get key-value configuration updates.
- * @return map of configuration updates
- */
- public Map getUpdates() {
- return updates;
- }
-
- /**
- * Get user who requested configuration change.
- * @return user who requested configuration change
- */
- public String getUser() {
- return user;
- }
- }
/**
* Initialize the configuration store, with schedConf as the initial
* scheduler configuration. If a persisted store already exists, use the
* scheduler configuration stored there, and ignore schedConf.
- * @param conf configuration to initialize store with
+ *
+ * @param conf configuration to initialize store with
* @param schedConf Initial key-value scheduler configuration to persist.
* @param rmContext RMContext for this configuration store
* @throws IOException if initialization fails
*/
public abstract void initialize(Configuration conf, Configuration schedConf,
- RMContext rmContext) throws Exception;
+ RMContext rmContext) throws Exception;
/**
* Closes the configuration store, releasing any required resources.
+ *
* @throws IOException on failure to close
*/
- public void close() throws IOException {}
+ public void close() throws IOException {
+ }
/**
* Logs the configuration change to backing store.
+ *
* @param logMutation configuration change to be persisted in write ahead log
* @throws IOException if logging fails
*/
@@ -113,35 +82,39 @@ public void close() throws IOException {}
* last logged by {@code logMutation} and marks the mutation as persisted (no
* longer pending). If isValid is true, merge the mutation with the persisted
* configuration.
+ *
* @param pendingMutation the log mutation to apply
- * @param isValid if true, update persisted configuration with pending
- * mutation.
+ * @param isValid if true, update persisted configuration with pending
+ * mutation.
* @throws Exception if mutation confirmation fails
*/
public abstract void confirmMutation(LogMutation pendingMutation,
- boolean isValid) throws Exception;
+ boolean isValid) throws Exception;
/**
* Retrieve the persisted configuration.
+ *
* @return configuration as key-value
*/
public abstract Configuration retrieve() throws IOException;
-
/**
* Format the persisted configuration.
+ *
* @throws IOException on failure to format
*/
public abstract void format() throws Exception;
/**
* Get the last updated config version.
+ *
* @return Last updated config version.
*/
public abstract long getConfigVersion() throws Exception;
/**
* Get a list of confirmed configuration mutations starting from a given id.
+ *
* @param fromId id from which to start getting mutations, inclusive
* @return list of configuration mutations
*/
@@ -150,6 +123,7 @@ public abstract void confirmMutation(LogMutation pendingMutation,
/**
* Get schema version of persisted conf store, for detecting compatibility
* issues when changing conf store schema.
+ *
* @return Schema version currently used by the persisted configuration store.
* @throws Exception On version fetch failure
*/
@@ -157,6 +131,7 @@ public abstract void confirmMutation(LogMutation pendingMutation,
/**
* Persist the hard-coded schema version to the conf store.
+ *
* @throws Exception On storage failure
*/
protected abstract void storeVersion() throws Exception;
@@ -164,30 +139,67 @@ public abstract void confirmMutation(LogMutation pendingMutation,
/**
* Get the hard-coded schema version, for comparison against the schema
* version currently persisted.
+ *
* @return Current hard-coded schema version
*/
protected abstract Version getCurrentVersion();
public void checkVersion() throws Exception {
- // TODO this was taken from RMStateStore. Should probably refactor
Version loadedVersion = getConfStoreVersion();
- LOG.info("Loaded configuration store version info " + loadedVersion);
- if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
+ Version currentVersion = getCurrentVersion();
+ LOG.info("Loaded configuration store version info {}", loadedVersion);
+
+ // when currentVersion is null, the checkVersion method is overridden
+ if (currentVersion.equals(loadedVersion)) {
return;
}
// if there is no version info, treat it as CURRENT_VERSION_INFO;
- if (loadedVersion == null) {
- loadedVersion = getCurrentVersion();
- }
- if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
- LOG.info("Storing configuration store version info "
- + getCurrentVersion());
+ if (loadedVersion == null || currentVersion.isCompatibleTo(loadedVersion)) {
+ LOG.info("Storing configuration store version info {}", currentVersion);
storeVersion();
} else {
- throw new RMStateVersionIncompatibleException(
- "Expecting configuration store version " + getCurrentVersion()
+ throw new YarnConfStoreVersionIncompatibleException(
+ "Expecting configuration store version " + currentVersion
+ ", but loading version " + loadedVersion);
}
}
+ /**
+ * LogMutation encapsulates the fields needed for configuration mutation
+ * audit logging and recovery.
+ */
+ public static class LogMutation implements Serializable {
+ private Map updates;
+ private String user;
+
+ /**
+ * Create log mutation.
+ *
+ * @param updates key-value configuration updates
+ * @param user user who requested configuration change
+ */
+ LogMutation(Map updates, String user) {
+ this.updates = updates;
+ this.user = user;
+ }
+
+ /**
+ * Get key-value configuration updates.
+ *
+ * @return map of configuration updates
+ */
+ public Map getUpdates() {
+ return updates;
+ }
+
+ /**
+ * Get user who requested configuration change.
+ *
+ * @return user who requested configuration change
+ */
+ public String getUser() {
+ return user;
+ }
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
index 4b3153a0d5a..7258e44d675 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
@@ -17,9 +17,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -27,6 +24,11 @@
import org.junit.Before;
import org.junit.Test;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.function.Supplier;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -34,16 +36,13 @@
* Base class for {@link YarnConfigurationStore} implementations.
*/
public abstract class ConfigurationStoreBaseTest {
+ static final String TEST_USER = "testUser";
+ YarnConfigurationStore confStore = createConfStore();
+ Configuration conf;
+ Configuration schedConf;
+ RMContext rmContext;
- protected YarnConfigurationStore confStore = createConfStore();
-
- protected abstract YarnConfigurationStore createConfStore();
-
- protected Configuration conf;
- protected Configuration schedConf;
- protected RMContext rmContext;
-
- protected static final String TEST_USER = "testUser";
+ abstract YarnConfigurationStore createConfStore();
@Before
public void setUp() throws Exception {
@@ -59,20 +58,10 @@ public void testConfigurationUpdate() throws Exception {
confStore.initialize(conf, schedConf, rmContext);
assertEquals("val1", confStore.retrieve().get("key1"));
- Map update1 = new HashMap<>();
- update1.put("keyUpdate1", "valUpdate1");
- YarnConfigurationStore.LogMutation mutation1 =
- new YarnConfigurationStore.LogMutation(update1, TEST_USER);
- confStore.logMutation(mutation1);
- confStore.confirmMutation(mutation1, true);
+ confStore.confirmMutation(prepareLogMutation("keyUpdate1", "valUpdate1"), true);
assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1"));
- Map update2 = new HashMap<>();
- update2.put("keyUpdate2", "valUpdate2");
- YarnConfigurationStore.LogMutation mutation2 =
- new YarnConfigurationStore.LogMutation(update2, TEST_USER);
- confStore.logMutation(mutation2);
- confStore.confirmMutation(mutation2, false);
+ confStore.confirmMutation(prepareLogMutation("keyUpdate2", "valUpdate2"), false);
assertNull("Configuration should not be updated",
confStore.retrieve().get("keyUpdate2"));
confStore.close();
@@ -84,13 +73,19 @@ public void testNullConfigurationUpdate() throws Exception {
confStore.initialize(conf, schedConf, rmContext);
assertEquals("val", confStore.retrieve().get("key"));
+ confStore.confirmMutation(prepareLogMutation("key", null), true);
+ assertNull(confStore.retrieve().get("key"));
+ confStore.close();
+ }
+
+ YarnConfigurationStore.LogMutation prepareLogMutation(String key,
+ String value) throws Exception {
Map update = new HashMap<>();
- update.put("key", null);
+ update.put(key, value);
YarnConfigurationStore.LogMutation mutation =
new YarnConfigurationStore.LogMutation(update, TEST_USER);
confStore.logMutation(mutation);
- confStore.confirmMutation(mutation, true);
- assertNull(confStore.retrieve().get("key"));
- confStore.close();
+
+ return mutation;
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/PersistentConfigurationStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/PersistentConfigurationStoreBaseTest.java
new file mode 100644
index 00000000000..2951ef2064d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/PersistentConfigurationStoreBaseTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Test;
+
+import java.util.LinkedList;
+import java.util.function.Supplier;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public abstract class PersistentConfigurationStoreBaseTest extends
+ ConfigurationStoreBaseTest {
+
+ @Test
+ public void testVersioning() throws Exception {
+ confStore.initialize(conf, schedConf, rmContext);
+ assertNull(confStore.getConfStoreVersion());
+ confStore.checkVersion();
+ assertEquals(LeveldbConfigurationStore.CURRENT_VERSION_INFO,
+ confStore.getConfStoreVersion());
+ confStore.close();
+ }
+
+ @Test
+ public void testGetConfigurationVersion() throws Exception {
+ confStore.initialize(conf, schedConf, rmContext);
+ long v1 = confStore.getConfigVersion();
+ assertEquals(1, v1);
+ confStore.confirmMutation(prepareLogMutation("keyver", "valver"), true);
+ long v2 = confStore.getConfigVersion();
+ assertEquals(2, v2);
+ confStore.close();
+ }
+
+ @Test
+ public void testPersistConfiguration() throws Exception {
+ schedConf.set("key", "val");
+ confStore.initialize(conf, schedConf, rmContext);
+ assertEquals("val", confStore.retrieve().get("key"));
+ confStore.close();
+
+ // Create a new configuration store, and check for old configuration
+ confStore = createConfStore();
+ schedConf.set("key", "badVal");
+ // Should ignore passed-in scheduler configuration.
+ confStore.initialize(conf, schedConf, rmContext);
+ assertEquals("val", confStore.retrieve().get("key"));
+ confStore.close();
+ }
+
+ @Test
+ public void testPersistUpdatedConfiguration() throws Exception {
+ confStore.initialize(conf, schedConf, rmContext);
+ assertNull(confStore.retrieve().get("key"));
+
+ confStore.confirmMutation(prepareLogMutation("key", "val"), true);
+ assertEquals("val", confStore.retrieve().get("key"));
+ confStore.close();
+
+ // Create a new configuration store, and check for updated configuration
+ confStore = createConfStore();
+ schedConf.set("key", "badVal");
+ // Should ignore passed-in scheduler configuration.
+ confStore.initialize(conf, schedConf, rmContext);
+ assertEquals("val", confStore.retrieve().get("key"));
+ confStore.close();
+ }
+
+ void testMaxLogsHelper(Supplier> logSupplier)
+ throws Exception {
+ conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2);
+ confStore.initialize(conf, schedConf, rmContext);
+ LinkedList logs = logSupplier.get();
+ assertEquals(0, logs.size());
+
+ YarnConfigurationStore.LogMutation mutation = prepareLogMutation("key1", "val1");
+ logs = logSupplier.get();
+ assertEquals(1, logs.size());
+ assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+ confStore.confirmMutation(mutation, true);
+ assertEquals(1, logs.size());
+ assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+
+ mutation = prepareLogMutation("key2", "val2");
+ logs = logSupplier.get();
+ assertEquals(2, logs.size());
+ assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+ assertEquals("val2", logs.get(1).getUpdates().get("key2"));
+ confStore.confirmMutation(mutation, true);
+ assertEquals(2, logs.size());
+ assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+ assertEquals("val2", logs.get(1).getUpdates().get("key2"));
+
+ // Next update should purge first update from logs.
+ mutation = prepareLogMutation("key3", "val3");
+ logs = logSupplier.get();
+ assertEquals(2, logs.size());
+ assertEquals("val2", logs.get(0).getUpdates().get("key2"));
+ assertEquals("val3", logs.get(1).getUpdates().get("key3"));
+ confStore.confirmMutation(mutation, true);
+ assertEquals(2, logs.size());
+ assertEquals("val2", logs.get(0).getUpdates().get("key2"));
+ assertEquals("val3", logs.get(1).getUpdates().get("key3"));
+ }
+
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
index e4ca3d3ada7..dda5f5de925 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,11 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -32,19 +27,24 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
+import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.junit.Assert.assertTrue;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.*;
/**
* Tests {@link FSSchedulerConfigurationStore}.
*/
public class TestFSSchedulerConfigurationStore {
+ private static final String TEST_USER = "test";
private FSSchedulerConfigurationStore configurationStore;
private Configuration conf;
private File testSchedulerConfigurationDir;
@@ -90,35 +90,27 @@ public void confirmMutationWithValid() throws Exception {
Configuration storeConf = configurationStore.retrieve();
compareConfig(conf, storeConf);
- Map updates = new HashMap<>();
- updates.put("a", null);
- updates.put("b", "bb");
-
Configuration expectConfig = new Configuration(conf);
expectConfig.unset("a");
expectConfig.set("b", "bb");
- LogMutation logMutation = new LogMutation(updates, "test");
- configurationStore.logMutation(logMutation);
- configurationStore.confirmMutation(logMutation, true);
+ prepareParameterizedLogMutation(configurationStore, true, "a", null, "b", "bb");
storeConf = configurationStore.retrieve();
- assertEquals(null, storeConf.get("a"));
+ assertNull(storeConf.get("a"));
assertEquals("bb", storeConf.get("b"));
assertEquals("c", storeConf.get("c"));
compareConfig(expectConfig, storeConf);
- updates.put("b", "bbb");
- configurationStore.logMutation(logMutation);
- configurationStore.confirmMutation(logMutation, true);
+ prepareParameterizedLogMutation(configurationStore, true, "a", null, "b", "bbb");
storeConf = configurationStore.retrieve();
- assertEquals(null, storeConf.get("a"));
+ assertNull(storeConf.get("a"));
assertEquals("bbb", storeConf.get("b"));
assertEquals("c", storeConf.get("c"));
}
@Test
- public void confirmMutationWithInValid() throws Exception {
+ public void confirmMutationWithInvalid() throws Exception {
conf.set("a", "a");
conf.set("b", "b");
conf.set("c", "c");
@@ -127,13 +119,7 @@ public void confirmMutationWithInValid() throws Exception {
Configuration storeConf = configurationStore.retrieve();
compareConfig(conf, storeConf);
- Map updates = new HashMap<>();
- updates.put("a", null);
- updates.put("b", "bb");
-
- LogMutation logMutation = new LogMutation(updates, "test");
- configurationStore.logMutation(logMutation);
- configurationStore.confirmMutation(logMutation, false);
+ prepareParameterizedLogMutation(configurationStore, false, "a", null, "b", "bb");
storeConf = configurationStore.retrieve();
compareConfig(conf, storeConf);
@@ -142,7 +128,7 @@ public void confirmMutationWithInValid() throws Exception {
@Test
public void testFileSystemClose() throws Exception {
MiniDFSCluster hdfsCluster = null;
- FileSystem fs = null;
+ FileSystem fs;
Path path = new Path("/tmp/confstore");
try {
HdfsConfiguration hdfsConfig = new HdfsConfiguration();
@@ -164,11 +150,7 @@ public void testFileSystemClose() throws Exception {
fs.close();
try {
- Map updates = new HashMap<>();
- updates.put("testkey", "testvalue");
- LogMutation logMutation = new LogMutation(updates, "test");
- configStore.logMutation(logMutation);
- configStore.confirmMutation(logMutation, true);
+ prepareParameterizedLogMutation(configStore, true, "testkey", "testvalue");
} catch (IOException e) {
if (e.getMessage().contains("Filesystem closed")) {
fail("FSSchedulerConfigurationStore failed to handle " +
@@ -178,13 +160,12 @@ public void testFileSystemClose() throws Exception {
}
}
} finally {
+ assert hdfsCluster != null;
fs = hdfsCluster.getFileSystem();
if (fs.exists(path)) {
fs.delete(path, true);
}
- if (hdfsCluster != null) {
- hdfsCluster.shutdown();
- }
+ hdfsCluster.shutdown();
}
}
@@ -197,15 +178,14 @@ public void testFormatConfiguration() throws Exception {
Configuration storedConfig = configurationStore.retrieve();
assertEquals("a", storedConfig.get("a"));
configurationStore.format();
- boolean exceptionCaught = false;
try {
- storedConfig = configurationStore.retrieve();
+ configurationStore.retrieve();
+ fail("Expected an IOException with message containing \"no capacity " +
+ "scheduler file in\" to be thrown");
} catch (IOException e) {
- if (e.getMessage().contains("no capacity scheduler file in")) {
- exceptionCaught = true;
- }
+ assertThat(e.getMessage(),
+ CoreMatchers.containsString("no capacity scheduler file in"));
}
- assertTrue(exceptionCaught);
}
@Test
@@ -227,12 +207,12 @@ public void checkVersion() {
try {
configurationStore.checkVersion();
} catch (Exception e) {
- fail("checkVersion throw exception");
+ fail("checkVersion throws exception");
}
}
private void compareConfig(Configuration schedulerConf,
- Configuration storedConfig) {
+ Configuration storedConfig) {
for (Map.Entry entry : schedulerConf) {
assertEquals(entry.getKey(), schedulerConf.get(entry.getKey()),
storedConfig.get(entry.getKey()));
@@ -243,4 +223,22 @@ private void compareConfig(Configuration schedulerConf,
schedulerConf.get(entry.getKey()));
}
}
+
+ private void prepareParameterizedLogMutation(FSSchedulerConfigurationStore configStore,
+ boolean validityFlag, String... values)
+ throws Exception {
+ Map updates = new HashMap<>();
+ String key;
+ String value;
+
+ for (int i = 1; i <= values.length; i += 2) {
+ key = values[i - 1];
+ value = i == values.length ? null : values[i];
+ updates.put(key, value);
+ }
+
+ LogMutation logMutation = new LogMutation(updates, TEST_USER);
+ configStore.logMutation(logMutation);
+ configStore.confirmMutation(logMutation, validityFlag);
+ }
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java
index c40d16a27cb..7f4d397eb12 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java
@@ -18,6 +18,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
/**
* Tests {@link InMemoryConfigurationStore}.
*/
@@ -27,4 +31,13 @@
protected YarnConfigurationStore createConfStore() {
return new InMemoryConfigurationStore();
}
+
+ @Test
+ public void checkVersion() {
+ try {
+ confStore.checkVersion();
+ } catch (Exception e) {
+ fail("checkVersion throw exception");
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java
index 0ae7624d78c..63a320dc5cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,11 +18,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
@@ -30,25 +29,26 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
-import org.junit.Before;
-import org.junit.Test;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
+import java.util.function.Supplier;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.*;
/**
* Tests {@link LeveldbConfigurationStore}.
*/
-public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
+public class TestLeveldbConfigurationStore extends
+ PersistentConfigurationStoreBaseTest {
public static final Logger LOG =
LoggerFactory.getLogger(TestLeveldbConfigurationStore.class);
@@ -68,53 +68,19 @@ public void setUp() throws Exception {
conf.set(YarnConfiguration.RM_SCHEDCONF_STORE_PATH, TEST_DIR.toString());
}
- @Test
- public void testVersioning() throws Exception {
- confStore.initialize(conf, schedConf, rmContext);
- assertNull(confStore.getConfStoreVersion());
- confStore.checkVersion();
- assertEquals(LeveldbConfigurationStore.CURRENT_VERSION_INFO,
- confStore.getConfStoreVersion());
- confStore.close();
- }
-
- @Test
- public void testPersistConfiguration() throws Exception {
- schedConf.set("key", "val");
- confStore.initialize(conf, schedConf, rmContext);
- assertEquals("val", confStore.retrieve().get("key"));
- confStore.close();
-
- // Create a new configuration store, and check for old configuration
- confStore = createConfStore();
- schedConf.set("key", "badVal");
- // Should ignore passed-in scheduler configuration.
- confStore.initialize(conf, schedConf, rmContext);
- assertEquals("val", confStore.retrieve().get("key"));
- confStore.close();
- }
+ @Test(expected = YarnConfStoreVersionIncompatibleException.class)
+ public void testIncompatibleVersion() throws Exception {
+ try {
+ confStore.initialize(conf, schedConf, rmContext);
- @Test
- public void testPersistUpdatedConfiguration() throws Exception {
- confStore.initialize(conf, schedConf, rmContext);
- assertNull(confStore.retrieve().get("key"));
-
- Map update = new HashMap<>();
- update.put("key", "val");
- YarnConfigurationStore.LogMutation mutation =
- new YarnConfigurationStore.LogMutation(update, TEST_USER);
- confStore.logMutation(mutation);
- confStore.confirmMutation(mutation, true);
- assertEquals("val", confStore.retrieve().get("key"));
- confStore.close();
+ Version otherVersion = Version.newInstance(1, 1);
+ ((LeveldbConfigurationStore) confStore).storeVersion(otherVersion);
- // Create a new configuration store, and check for updated configuration
- confStore = createConfStore();
- schedConf.set("key", "badVal");
- // Should ignore passed-in scheduler configuration.
- confStore.initialize(conf, schedConf, rmContext);
- assertEquals("val", confStore.retrieve().get("key"));
- confStore.close();
+ assertEquals(otherVersion, confStore.getConfStoreVersion());
+ confStore.checkVersion();
+ } finally {
+ confStore.close();
+ }
}
@Test
@@ -122,11 +88,7 @@ public void testDisableAuditLogs() throws Exception {
conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 0);
confStore.initialize(conf, schedConf, rmContext);
- Map update = new HashMap<>();
- update.put("key1", "val1");
- YarnConfigurationStore.LogMutation mutation =
- new YarnConfigurationStore.LogMutation(update, TEST_USER);
- confStore.logMutation(mutation);
+ prepareLogMutation("key1", "val1");
boolean logKeyPresent = false;
DB db = ((LeveldbConfigurationStore) confStore).getDB();
@@ -146,56 +108,22 @@ public void testDisableAuditLogs() throws Exception {
@Test
public void testMaxLogs() throws Exception {
- conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2);
- confStore.initialize(conf, schedConf, rmContext);
- LinkedList logs =
- ((LeveldbConfigurationStore) confStore).getLogs();
- assertEquals(0, logs.size());
-
- Map update1 = new HashMap<>();
- update1.put("key1", "val1");
- YarnConfigurationStore.LogMutation mutation =
- new YarnConfigurationStore.LogMutation(update1, TEST_USER);
- confStore.logMutation(mutation);
- logs = ((LeveldbConfigurationStore) confStore).getLogs();
- assertEquals(1, logs.size());
- assertEquals("val1", logs.get(0).getUpdates().get("key1"));
- confStore.confirmMutation(mutation, true);
- assertEquals(1, logs.size());
- assertEquals("val1", logs.get(0).getUpdates().get("key1"));
-
- Map update2 = new HashMap<>();
- update2.put("key2", "val2");
- mutation = new YarnConfigurationStore.LogMutation(update2, TEST_USER);
- confStore.logMutation(mutation);
- logs = ((LeveldbConfigurationStore) confStore).getLogs();
- assertEquals(2, logs.size());
- assertEquals("val1", logs.get(0).getUpdates().get("key1"));
- assertEquals("val2", logs.get(1).getUpdates().get("key2"));
- confStore.confirmMutation(mutation, true);
- assertEquals(2, logs.size());
- assertEquals("val1", logs.get(0).getUpdates().get("key1"));
- assertEquals("val2", logs.get(1).getUpdates().get("key2"));
-
- // Next update should purge first update from logs.
- Map update3 = new HashMap<>();
- update3.put("key3", "val3");
- mutation = new YarnConfigurationStore.LogMutation(update3, TEST_USER);
- confStore.logMutation(mutation);
- logs = ((LeveldbConfigurationStore) confStore).getLogs();
- assertEquals(2, logs.size());
- assertEquals("val2", logs.get(0).getUpdates().get("key2"));
- assertEquals("val3", logs.get(1).getUpdates().get("key3"));
- confStore.confirmMutation(mutation, true);
- assertEquals(2, logs.size());
- assertEquals("val2", logs.get(0).getUpdates().get("key2"));
- assertEquals("val3", logs.get(1).getUpdates().get("key3"));
+ Supplier> logSupplier = () -> {
+ try {
+ return ((LeveldbConfigurationStore) confStore).getLogs();
+ } catch (Exception e) {
+ fail("getLogs should not throw exception");
+ return new LinkedList<>();
+ }
+ };
+ testMaxLogsHelper(logSupplier);
confStore.close();
}
/**
* When restarting, RM should read from current state of store, including
* any updates from the previous RM instance.
+ *
* @throws Exception
*/
@Test
@@ -222,7 +150,7 @@ public void testRestartReadsFromUpdatedStore() throws Exception {
.getConfStore().retrieve().get("key"));
// Next update is not persisted, it should not be recovered
schedConfUpdateInfo.getGlobalParams().put("key", "badVal");
- log = confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+ confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
rm1.close();
// Start RM2 and verifies it starts with updated configuration
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
index e67e382f71e..6a6d9aed022 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,9 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
-import org.apache.hadoop.util.curator.ZKCuratorManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
@@ -30,8 +27,11 @@
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
+import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
@@ -41,25 +41,25 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
+import java.util.*;
+import java.util.function.Supplier;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
/**
* Tests {@link ZKConfigurationStore}.
*/
-public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
-
+public class TestZKConfigurationStore extends
+ PersistentConfigurationStoreBaseTest {
public static final Logger LOG =
LoggerFactory.getLogger(TestZKConfigurationStore.class);
@@ -75,7 +75,7 @@ public static TestingServer setupCuratorServer() throws Exception {
}
public static CuratorFramework setupCuratorFramework(
- TestingServer curatorTestingServer) throws Exception {
+ TestingServer curatorTestingServer) {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString(curatorTestingServer.getConnectString())
.retryPolicy(new RetryNTimes(100, 100))
@@ -85,6 +85,7 @@ public static CuratorFramework setupCuratorFramework(
}
@Before
+ @Override
public void setUp() throws Exception {
super.setUp();
curatorTestingServer = setupCuratorServer();
@@ -104,32 +105,28 @@ public void cleanup() throws IOException {
curatorTestingServer.stop();
}
- @Test
- public void testVersioning() throws Exception {
+ @Test(expected = YarnConfStoreVersionIncompatibleException.class)
+ public void testIncompatibleVersion() throws Exception {
confStore.initialize(conf, schedConf, rmContext);
- assertNull(confStore.getConfStoreVersion());
- confStore.checkVersion();
- assertEquals(ZKConfigurationStore.CURRENT_VERSION_INFO,
- confStore.getConfStoreVersion());
- }
- @Test
- public void testPersistConfiguration() throws Exception {
- schedConf.set("key", "val");
- confStore.initialize(conf, schedConf, rmContext);
- assertEquals("val", confStore.retrieve().get("key"));
-
- assertNull(confStore.retrieve().get(YarnConfiguration.RM_HOSTNAME));
-
- // Create a new configuration store, and check for old configuration
- confStore = createConfStore();
- schedConf.set("key", "badVal");
- // Should ignore passed-in scheduler configuration.
- confStore.initialize(conf, schedConf, rmContext);
- assertEquals("val", confStore.retrieve().get("key"));
+ Version otherVersion = Version.newInstance(1, 1);
+ String znodeParentPath = conf.get(YarnConfiguration.
+ RM_SCHEDCONF_STORE_ZK_PARENT_PATH,
+ YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH);
+ String zkVersionPath = ZKCuratorManager.getNodePath(znodeParentPath,
+ "VERSION");
+ String fencingNodePath = ZKCuratorManager.getNodePath(znodeParentPath,
+ "FENCING");
+ byte[] versionData =
+ ((VersionPBImpl) otherVersion).getProto().toByteArray();
+ List zkAcl = ZKCuratorManager.getZKAcls(conf);
+ ((ZKConfigurationStore) confStore).zkManager.safeCreate(zkVersionPath,
+ versionData, zkAcl, CreateMode.PERSISTENT, zkAcl, fencingNodePath);
+
+ assertEquals(otherVersion, confStore.getConfStoreVersion());
+ confStore.checkVersion();
}
-
@Test
public void testFormatConfiguration() throws Exception {
schedConf.set("key", "val");
@@ -139,113 +136,38 @@ public void testFormatConfiguration() throws Exception {
assertNull(confStore.retrieve());
}
- @Test
- public void testGetConfigurationVersion() throws Exception {
- confStore.initialize(conf, schedConf, rmContext);
- long v1 = confStore.getConfigVersion();
- assertEquals(1, v1);
- Map update = new HashMap<>();
- update.put("keyver", "valver");
- YarnConfigurationStore.LogMutation mutation =
- new YarnConfigurationStore.LogMutation(update, TEST_USER);
- confStore.logMutation(mutation);
- confStore.confirmMutation(mutation, true);
- long v2 = confStore.getConfigVersion();
- assertEquals(2, v2);
- }
-
- @Test
- public void testPersistUpdatedConfiguration() throws Exception {
- confStore.initialize(conf, schedConf, rmContext);
- assertNull(confStore.retrieve().get("key"));
-
- Map update = new HashMap<>();
- update.put("key", "val");
- LogMutation mutation = new LogMutation(update, TEST_USER);
- confStore.logMutation(mutation);
- confStore.confirmMutation(mutation, true);
- assertEquals("val", confStore.retrieve().get("key"));
-
- // Create a new configuration store, and check for updated configuration
- confStore = createConfStore();
- schedConf.set("key", "badVal");
- // Should ignore passed-in scheduler configuration.
- confStore.initialize(conf, schedConf, rmContext);
- assertEquals("val", confStore.retrieve().get("key"));
- }
-
- @Test
- public void testMaxLogs() throws Exception {
- conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2);
- confStore.initialize(conf, schedConf, rmContext);
- LinkedList logs =
- ((ZKConfigurationStore) confStore).getLogs();
- assertEquals(0, logs.size());
-
- Map update1 = new HashMap<>();
- update1.put("key1", "val1");
- YarnConfigurationStore.LogMutation mutation =
- new YarnConfigurationStore.LogMutation(update1, TEST_USER);
- confStore.logMutation(mutation);
- logs = ((ZKConfigurationStore) confStore).getLogs();
- assertEquals(1, logs.size());
- assertEquals("val1", logs.get(0).getUpdates().get("key1"));
- confStore.confirmMutation(mutation, true);
- assertEquals(1, logs.size());
- assertEquals("val1", logs.get(0).getUpdates().get("key1"));
-
- Map update2 = new HashMap<>();
- update2.put("key2", "val2");
- mutation = new YarnConfigurationStore.LogMutation(update2, TEST_USER);
- confStore.logMutation(mutation);
- logs = ((ZKConfigurationStore) confStore).getLogs();
- assertEquals(2, logs.size());
- assertEquals("val1", logs.get(0).getUpdates().get("key1"));
- assertEquals("val2", logs.get(1).getUpdates().get("key2"));
- confStore.confirmMutation(mutation, true);
- assertEquals(2, logs.size());
- assertEquals("val1", logs.get(0).getUpdates().get("key1"));
- assertEquals("val2", logs.get(1).getUpdates().get("key2"));
-
- // Next update should purge first update from logs.
- Map update3 = new HashMap<>();
- update3.put("key3", "val3");
- mutation = new YarnConfigurationStore.LogMutation(update3, TEST_USER);
- confStore.logMutation(mutation);
- logs = ((ZKConfigurationStore) confStore).getLogs();
- assertEquals(2, logs.size());
- assertEquals("val2", logs.get(0).getUpdates().get("key2"));
- assertEquals("val3", logs.get(1).getUpdates().get("key3"));
- confStore.confirmMutation(mutation, true);
- assertEquals(2, logs.size());
- assertEquals("val2", logs.get(0).getUpdates().get("key2"));
- assertEquals("val3", logs.get(1).getUpdates().get("key3"));
- }
-
-
@Test
public void testDisableAuditLogs() throws Exception {
conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 0);
confStore.initialize(conf, schedConf, rmContext);
String znodeParentPath = conf.get(YarnConfiguration.
- RM_SCHEDCONF_STORE_ZK_PARENT_PATH,
+ RM_SCHEDCONF_STORE_ZK_PARENT_PATH,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH);
String logsPath = ZKCuratorManager.getNodePath(znodeParentPath, "LOGS");
byte[] data = null;
((ZKConfigurationStore) confStore).zkManager.setData(logsPath, data, -1);
- Map update = new HashMap<>();
- update.put("key1", "val1");
- YarnConfigurationStore.LogMutation mutation =
- new YarnConfigurationStore.LogMutation(update, TEST_USER);
- confStore.logMutation(mutation);
+ prepareLogMutation("key1", "val1");
data = ((ZKConfigurationStore) confStore).zkManager.getData(logsPath);
assertNull("Failed to Disable Audit Logs", data);
}
+ @Test
+ public void testMaxLogs() throws Exception {
+ Supplier> logSupplier = () -> {
+ try {
+ return ((ZKConfigurationStore) confStore).getLogs();
+ } catch (Exception e) {
+ fail("getLogs should not throw an Exception");
+ return new LinkedList<>();
+ }
+ };
+ testMaxLogsHelper(logSupplier);
+ }
+
public Configuration createRMHAConf(String rmIds, String rmId,
- int adminPort) {
+ int adminPort) {
Configuration conf = new YarnConfiguration();
this.conf.setClass(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class, CapacityScheduler.class);
@@ -274,13 +196,14 @@ public Configuration createRMHAConf(String rmIds, String rmId,
/**
* When failing over, new active RM should read from current state of store,
* including any updates when the new active RM was in standby.
+ *
* @throws Exception
*/
@Test
public void testFailoverReadsFromUpdatedStore() throws Exception {
HAServiceProtocol.StateChangeRequestInfo req =
new HAServiceProtocol.StateChangeRequestInfo(
- HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+ HAServiceProtocol.RequestSource.REQUEST_BY_USER);
Configuration conf1 = createRMHAConf("rm1,rm2", "rm1", 1234);
ResourceManager rm1 = new MockRM(conf1);
@@ -318,7 +241,7 @@ public void testFailoverReadsFromUpdatedStore() throws Exception {
.getConfStore().retrieve().get("key"));
// Next update is not persisted, it should not be recovered
schedConfUpdateInfo.getGlobalParams().put("key", "badVal");
- log = confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+ confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
// Start RM2 and verifies it starts with updated configuration
rm2.getRMContext().getRMAdminService().transitionToActive(req);
@@ -360,6 +283,7 @@ public void testFailoverReadsFromUpdatedStore() throws Exception {
/**
* When failing over, if RM1 stopped and removed a queue that RM2 has in
* memory, failing over to RM2 should not throw an exception.
+ *
* @throws Exception
*/
@Test
@@ -406,14 +330,14 @@ public void testFailoverAfterRemoveQueue() throws Exception {
rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext());
confProvider.confirmPendingMutation(log, true);
assertTrue(Arrays.asList(((MutableConfScheduler) rm1.getResourceScheduler())
- .getConfiguration().get("yarn.scheduler.capacity.root.queues").split
- (",")).contains("a"));
+ .getConfiguration().get("yarn.scheduler.capacity.root.queues").split(
+ ",")).contains("a"));
// Remove root.default
schedConfUpdateInfo.getUpdateQueueInfo().clear();
schedConfUpdateInfo.getAddQueueInfo().clear();
schedConfUpdateInfo.getRemoveQueueInfo().add("root.default");
- log = confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+ log = confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext());
confProvider.confirmPendingMutation(log, true);
assertEquals("a", ((MutableConfScheduler) rm1.getResourceScheduler())