diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
index d031ea984ff..8e790dd69bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
@@ -87,13 +87,13 @@ public long getConfigVersion() {
}
@Override
- public Version getConfStoreVersion() throws Exception {
+ public Version getConfStoreVersion() {
// Does nothing.
return null;
}
@Override
- public void storeVersion() throws Exception {
+ public void storeVersion() {
// Does nothing.
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfStoreVersionIncompatibleException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfStoreVersionIncompatibleException.java
new file mode 100644
index 00000000000..e9bcc2933a4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfStoreVersionIncompatibleException.java
@@ -0,0 +1,21 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+public class YarnConfStoreVersionIncompatibleException extends
+ YarnException {
+ private static final long serialVersionUID = -2829858253579013629L;
+
+ public YarnConfStoreVersionIncompatibleException(Throwable cause) {
+ super(cause);
+ }
+
+ public YarnConfStoreVersionIncompatibleException(String message) {
+ super(message);
+ }
+
+ public YarnConfStoreVersionIncompatibleException(
+ String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
index 34aa17428c2..437190d3d28 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,13 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateVersionIncompatibleException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
@@ -38,7 +37,7 @@
* should first log it with {@code logMutation}, which persists this pending
* mutation. This mutation is merged to the persisted configuration only after
* {@code confirmMutation} is called.
- *
+ *
* On startup/recovery, caller should call {@code retrieve} to get all
* confirmed mutations, then get pending mutations which were not confirmed via
* {@code getPendingMutations}, and replay/confirm them via
@@ -48,61 +47,31 @@
public static final Logger LOG =
LoggerFactory.getLogger(YarnConfigurationStore.class);
- /**
- * LogMutation encapsulates the fields needed for configuration mutation
- * audit logging and recovery.
- */
- public static class LogMutation implements Serializable {
- private Map updates;
- private String user;
-
- /**
- * Create log mutation.
- * @param updates key-value configuration updates
- * @param user user who requested configuration change
- */
- LogMutation(Map updates, String user) {
- this.updates = updates;
- this.user = user;
- }
-
- /**
- * Get key-value configuration updates.
- * @return map of configuration updates
- */
- public Map getUpdates() {
- return updates;
- }
-
- /**
- * Get user who requested configuration change.
- * @return user who requested configuration change
- */
- public String getUser() {
- return user;
- }
- }
/**
* Initialize the configuration store, with schedConf as the initial
* scheduler configuration. If a persisted store already exists, use the
* scheduler configuration stored there, and ignore schedConf.
- * @param conf configuration to initialize store with
+ *
+ * @param conf configuration to initialize store with
* @param schedConf Initial key-value scheduler configuration to persist.
* @param rmContext RMContext for this configuration store
* @throws IOException if initialization fails
*/
public abstract void initialize(Configuration conf, Configuration schedConf,
- RMContext rmContext) throws Exception;
+ RMContext rmContext) throws Exception;
/**
* Closes the configuration store, releasing any required resources.
+ *
* @throws IOException on failure to close
*/
- public void close() throws IOException {}
+ public void close() throws IOException {
+ }
/**
* Logs the configuration change to backing store.
+ *
* @param logMutation configuration change to be persisted in write ahead log
* @throws IOException if logging fails
*/
@@ -113,35 +82,39 @@ public void close() throws IOException {}
* last logged by {@code logMutation} and marks the mutation as persisted (no
* longer pending). If isValid is true, merge the mutation with the persisted
* configuration.
+ *
* @param pendingMutation the log mutation to apply
- * @param isValid if true, update persisted configuration with pending
- * mutation.
+ * @param isValid if true, update persisted configuration with pending
+ * mutation.
* @throws Exception if mutation confirmation fails
*/
public abstract void confirmMutation(LogMutation pendingMutation,
- boolean isValid) throws Exception;
+ boolean isValid) throws Exception;
/**
* Retrieve the persisted configuration.
+ *
* @return configuration as key-value
*/
public abstract Configuration retrieve() throws IOException;
-
/**
* Format the persisted configuration.
+ *
* @throws IOException on failure to format
*/
public abstract void format() throws Exception;
/**
* Get the last updated config version.
+ *
* @return Last updated config version.
*/
public abstract long getConfigVersion() throws Exception;
/**
* Get a list of confirmed configuration mutations starting from a given id.
+ *
* @param fromId id from which to start getting mutations, inclusive
* @return list of configuration mutations
*/
@@ -150,6 +123,7 @@ public abstract void confirmMutation(LogMutation pendingMutation,
/**
* Get schema version of persisted conf store, for detecting compatibility
* issues when changing conf store schema.
+ *
* @return Schema version currently used by the persisted configuration store.
* @throws Exception On version fetch failure
*/
@@ -157,6 +131,7 @@ public abstract void confirmMutation(LogMutation pendingMutation,
/**
* Persist the hard-coded schema version to the conf store.
+ *
* @throws Exception On storage failure
*/
protected abstract void storeVersion() throws Exception;
@@ -164,30 +139,67 @@ public abstract void confirmMutation(LogMutation pendingMutation,
/**
* Get the hard-coded schema version, for comparison against the schema
* version currently persisted.
+ *
* @return Current hard-coded schema version
*/
protected abstract Version getCurrentVersion();
public void checkVersion() throws Exception {
- // TODO this was taken from RMStateStore. Should probably refactor
Version loadedVersion = getConfStoreVersion();
- LOG.info("Loaded configuration store version info " + loadedVersion);
- if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
+ Version currentVersion = getCurrentVersion();
+ LOG.info("Loaded configuration store version info {}", loadedVersion);
+
+ // when currentVersion is null, the checkVersion method is overridden
+ if (currentVersion.equals(loadedVersion)) {
return;
}
// if there is no version info, treat it as CURRENT_VERSION_INFO;
- if (loadedVersion == null) {
- loadedVersion = getCurrentVersion();
- }
- if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
- LOG.info("Storing configuration store version info "
- + getCurrentVersion());
+ if (loadedVersion == null || currentVersion.isCompatibleTo(loadedVersion)) {
+ LOG.info("Storing configuration store version info {}", currentVersion);
storeVersion();
} else {
- throw new RMStateVersionIncompatibleException(
- "Expecting configuration store version " + getCurrentVersion()
+ throw new YarnConfStoreVersionIncompatibleException(
+ "Expecting configuration store version " + currentVersion
+ ", but loading version " + loadedVersion);
}
}
+ /**
+ * LogMutation encapsulates the fields needed for configuration mutation
+ * audit logging and recovery.
+ */
+ public static class LogMutation implements Serializable {
+ private Map updates;
+ private String user;
+
+ /**
+ * Create log mutation.
+ *
+ * @param updates key-value configuration updates
+ * @param user user who requested configuration change
+ */
+ LogMutation(Map updates, String user) {
+ this.updates = updates;
+ this.user = user;
+ }
+
+ /**
+ * Get key-value configuration updates.
+ *
+ * @return map of configuration updates
+ */
+ public Map getUpdates() {
+ return updates;
+ }
+
+ /**
+ * Get user who requested configuration change.
+ *
+ * @return user who requested configuration change
+ */
+ public String getUser() {
+ return user;
+ }
+ }
+
}