diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java index d031ea984ff..8e790dd69bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java @@ -87,13 +87,13 @@ public long getConfigVersion() { } @Override - public Version getConfStoreVersion() throws Exception { + public Version getConfStoreVersion() { // Does nothing. return null; } @Override - public void storeVersion() throws Exception { + public void storeVersion() { // Does nothing. } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfStoreVersionIncompatibleException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfStoreVersionIncompatibleException.java new file mode 100644 index 00000000000..e9bcc2933a4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfStoreVersionIncompatibleException.java @@ -0,0 +1,21 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +public class YarnConfStoreVersionIncompatibleException extends + YarnException { + private static final long serialVersionUID = -2829858253579013629L; + + public YarnConfStoreVersionIncompatibleException(Throwable cause) { + super(cause); + } + + public YarnConfStoreVersionIncompatibleException(String message) { + super(message); + } + + public YarnConfStoreVersionIncompatibleException( + String message, Throwable cause) { + super(message, cause); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java index 34aa17428c2..55aadbea9d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,13 +18,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateVersionIncompatibleException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; @@ -38,7 +37,7 @@ * should first log it with {@code logMutation}, which persists this pending * mutation. This mutation is merged to the persisted configuration only after * {@code confirmMutation} is called. - * + *

* On startup/recovery, caller should call {@code retrieve} to get all * confirmed mutations, then get pending mutations which were not confirmed via * {@code getPendingMutations}, and replay/confirm them via @@ -48,61 +47,31 @@ public static final Logger LOG = LoggerFactory.getLogger(YarnConfigurationStore.class); - /** - * LogMutation encapsulates the fields needed for configuration mutation - * audit logging and recovery. - */ - public static class LogMutation implements Serializable { - private Map updates; - private String user; - - /** - * Create log mutation. - * @param updates key-value configuration updates - * @param user user who requested configuration change - */ - LogMutation(Map updates, String user) { - this.updates = updates; - this.user = user; - } - - /** - * Get key-value configuration updates. - * @return map of configuration updates - */ - public Map getUpdates() { - return updates; - } - - /** - * Get user who requested configuration change. - * @return user who requested configuration change - */ - public String getUser() { - return user; - } - } /** * Initialize the configuration store, with schedConf as the initial * scheduler configuration. If a persisted store already exists, use the * scheduler configuration stored there, and ignore schedConf. - * @param conf configuration to initialize store with + * + * @param conf configuration to initialize store with * @param schedConf Initial key-value scheduler configuration to persist. * @param rmContext RMContext for this configuration store * @throws IOException if initialization fails */ public abstract void initialize(Configuration conf, Configuration schedConf, - RMContext rmContext) throws Exception; + RMContext rmContext) throws Exception; /** * Closes the configuration store, releasing any required resources. + * * @throws IOException on failure to close */ - public void close() throws IOException {} + public void close() throws IOException { + } /** * Logs the configuration change to backing store. + * * @param logMutation configuration change to be persisted in write ahead log * @throws IOException if logging fails */ @@ -113,35 +82,39 @@ public void close() throws IOException {} * last logged by {@code logMutation} and marks the mutation as persisted (no * longer pending). If isValid is true, merge the mutation with the persisted * configuration. + * * @param pendingMutation the log mutation to apply - * @param isValid if true, update persisted configuration with pending - * mutation. + * @param isValid if true, update persisted configuration with pending + * mutation. * @throws Exception if mutation confirmation fails */ public abstract void confirmMutation(LogMutation pendingMutation, - boolean isValid) throws Exception; + boolean isValid) throws Exception; /** * Retrieve the persisted configuration. + * * @return configuration as key-value */ public abstract Configuration retrieve() throws IOException; - /** * Format the persisted configuration. + * * @throws IOException on failure to format */ public abstract void format() throws Exception; /** * Get the last updated config version. + * * @return Last updated config version. */ public abstract long getConfigVersion() throws Exception; /** * Get a list of confirmed configuration mutations starting from a given id. + * * @param fromId id from which to start getting mutations, inclusive * @return list of configuration mutations */ @@ -150,6 +123,7 @@ public abstract void confirmMutation(LogMutation pendingMutation, /** * Get schema version of persisted conf store, for detecting compatibility * issues when changing conf store schema. + * * @return Schema version currently used by the persisted configuration store. * @throws Exception On version fetch failure */ @@ -157,6 +131,7 @@ public abstract void confirmMutation(LogMutation pendingMutation, /** * Persist the hard-coded schema version to the conf store. + * * @throws Exception On storage failure */ protected abstract void storeVersion() throws Exception; @@ -164,30 +139,71 @@ public abstract void confirmMutation(LogMutation pendingMutation, /** * Get the hard-coded schema version, for comparison against the schema * version currently persisted. + * * @return Current hard-coded schema version */ protected abstract Version getCurrentVersion(); public void checkVersion() throws Exception { - // TODO this was taken from RMStateStore. Should probably refactor Version loadedVersion = getConfStoreVersion(); - LOG.info("Loaded configuration store version info " + loadedVersion); - if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) { + Version currentVersion = getCurrentVersion(); + LOG.info("Loaded configuration store version info {}", loadedVersion); + + // when currentVersion is null, the checkVersion method is overridden + if (currentVersion.equals(loadedVersion)) { return; } // if there is no version info, treat it as CURRENT_VERSION_INFO; if (loadedVersion == null) { - loadedVersion = getCurrentVersion(); + loadedVersion = currentVersion; } - if (loadedVersion.isCompatibleTo(getCurrentVersion())) { - LOG.info("Storing configuration store version info " - + getCurrentVersion()); + + if (loadedVersion.isCompatibleTo(currentVersion)) { + LOG.info("Storing configuration store version info {}", currentVersion); storeVersion(); } else { - throw new RMStateVersionIncompatibleException( - "Expecting configuration store version " + getCurrentVersion() + throw new YarnConfStoreVersionIncompatibleException( + "Expecting configuration store version " + currentVersion + ", but loading version " + loadedVersion); } } + /** + * LogMutation encapsulates the fields needed for configuration mutation + * audit logging and recovery. + */ + public static class LogMutation implements Serializable { + private Map updates; + private String user; + + /** + * Create log mutation. + * + * @param updates key-value configuration updates + * @param user user who requested configuration change + */ + LogMutation(Map updates, String user) { + this.updates = updates; + this.user = user; + } + + /** + * Get key-value configuration updates. + * + * @return map of configuration updates + */ + public Map getUpdates() { + return updates; + } + + /** + * Get user who requested configuration change. + * + * @return user who requested configuration change + */ + public String getUser() { + return user; + } + } + }