diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 6ffe1f7ed3c..793a554a88c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -643,6 +643,7 @@ public static boolean isAclEnabled(Configuration conf) { YARN_PREFIX + "scheduler.configuration.store.class"; public static final String MEMORY_CONFIGURATION_STORE = "memory"; public static final String LEVELDB_CONFIGURATION_STORE = "leveldb"; + public static final String ZK_CONFIGURATION_STORE = "zk"; public static final String DEFAULT_CONFIGURATION_STORE = MEMORY_CONFIGURATION_STORE; public static final String RM_SCHEDCONF_STORE_PATH = YARN_PREFIX @@ -654,9 +655,16 @@ public static boolean isAclEnabled(Configuration conf) { public static final long DEFAULT_RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS = 60 * 60 * 24L; - public static final String RM_SCHEDCONF_LEVELDB_MAX_LOGS = - YARN_PREFIX + "scheduler.configuration.leveldb-store.max-logs"; - public static final int DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS = 1000; + public static final String RM_SCHEDCONF_MAX_LOGS = + YARN_PREFIX + "scheduler.configuration.store.max-logs"; + public static final long DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS = 1000; + public static final long DEFAULT_RM_SCHEDCONF_ZK_MAX_LOGS = 1000; + + /** Parent znode path under which ZKConfigurationStore will create znodes. */ + public static final String RM_SCHEDCONF_STORE_ZK_PARENT_PATH = YARN_PREFIX + + "scheduler.configuration.zk-store.parent-path"; + public static final String DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH = + "/confstore"; public static final String RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS = YARN_PREFIX + "scheduler.configuration.mutation.acl-policy.class"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 2d6574f55b5..dd9e91375ec 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3271,11 +3271,20 @@ - The max number of configuration change log entries kept in LevelDB config + The max number of configuration change log entries kept in config store, when yarn.scheduler.configuration.store.class is configured to be - "leveldb". Default is 1000. + "leveldb" or "zk". Default is 1000 for either. - yarn.scheduler.configuration.leveldb-store.max-logs + yarn.scheduler.configuration.store.max-logs 1000 + + + + ZK root node path for configuration store when using zookeeper-based + configuration store. + + yarn.scheduler.configuration.zk-store.parent-path + /confstore + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index fd9e849f3c4..2b4d2dd401c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -387,9 +387,7 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) RefreshQueuesResponse response = recordFactory.newRecordInstance(RefreshQueuesResponse.class); try { - ResourceScheduler scheduler = rm.getRMContext().getScheduler(); - if (scheduler instanceof MutableConfScheduler - && ((MutableConfScheduler) scheduler).isConfigurationMutable()) { + if (isSchedulerMutable()) { throw new IOException("Scheduler configuration is mutable. " + operation + " is not allowed in this scenario."); } @@ -404,6 +402,19 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) @Private public void refreshQueues() throws IOException, YarnException { + refreshQueues(false); + } + + private void refreshQueues(boolean isActiveTransition) + throws IOException, YarnException { + if (isActiveTransition && isSchedulerMutable()) { + try { + ((MutableConfScheduler) rm.getRMContext().getScheduler()) + .refreshConfiguration(); + } catch (Exception e) { + throw new IOException("Failed to refresh configuration:", e); + } + } rm.getRMContext().getScheduler().reinitialize(getConfig(), this.rm.getRMContext()); // refresh the reservation system @@ -413,6 +424,12 @@ public void refreshQueues() throws IOException, YarnException { } } + private boolean isSchedulerMutable() { + ResourceScheduler scheduler = rm.getRMContext().getScheduler(); + return (scheduler instanceof MutableConfScheduler + && ((MutableConfScheduler) scheduler).isConfigurationMutable()); + } + @Override public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) throws YarnException, StandbyException { @@ -721,7 +738,7 @@ private synchronized Configuration getConfiguration(Configuration conf, void refreshAll() throws ServiceFailedException { try { checkAcls("refreshAll"); - refreshQueues(); + refreshQueues(true); refreshNodes(); refreshSuperUserGroupsConfiguration(); refreshUserToGroupsMappings(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java index 135868f1a57..d5fce36efba 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java @@ -22,7 +22,7 @@ /** * This exception is thrown by ResourceManager if it's loading an incompatible - * version of state from state store on recovery. + * version of storage on recovery. */ public class RMStateVersionIncompatibleException extends YarnException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java index 313bf6aee1c..7fe900ef2e5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -41,6 +42,13 @@ void updateConfiguration(UserGroupInformation user, SchedConfUpdateInfo confUpdate) throws IOException, YarnException; /** + * Called when a new ResourceManager is starting/becomes active. Ensures + * configuration is up-to-date. + * @throws Exception if configuration could not be refreshed from store + */ + void refreshConfiguration() throws Exception; + + /** * Get the scheduler configuration. * @return the scheduler configuration */ @@ -58,4 +66,7 @@ void updateConfiguration(UserGroupInformation user, * @return whether scheduler configuration is mutable or not. */ boolean isConfigurationMutable(); + + @VisibleForTesting + MutableConfigurationProvider getMutableConfProvider(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java index 9baf1ad1ee2..482ef7a2819 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java @@ -18,12 +18,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore; import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; -import java.io.IOException; - /** * Interface for allowing changing scheduler configurations. */ @@ -31,18 +31,20 @@ /** * Apply transactions which were not committed. - * @throws IOException if recovery fails + * @throws Exception if recovery fails */ - void recoverConf() throws IOException; + void recoverConf() throws Exception; /** * Update the scheduler configuration with the provided key value pairs. * @param user User issuing the request * @param confUpdate Key-value pairs for configurations to be updated. - * @throws IOException if scheduler could not be reinitialized + * @throws Exception if scheduler could not be reinitialized * @throws YarnException if reservation system could not be reinitialized */ void mutateConfiguration(UserGroupInformation user, SchedConfUpdateInfo - confUpdate) throws IOException, YarnException; + confUpdate) throws Exception; + @VisibleForTesting + YarnConfigurationStore getConfStore(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 1091fcd1680..5d052f71ba4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -391,9 +391,7 @@ public void serviceInit(Configuration conf) throws Exception { @Override public void serviceStart() throws Exception { startSchedulerThreads(); - if (this.csConfProvider instanceof MutableConfigurationProvider) { - ((MutableConfigurationProvider) csConfProvider).recoverConf(); - } + refreshConfiguration(); super.serviceStart(); } @@ -2567,8 +2565,12 @@ public boolean moveReservedContainer(RMContainer toBeMovedContainer, public void updateConfiguration(UserGroupInformation user, SchedConfUpdateInfo confUpdate) throws IOException, YarnException { if (isConfigurationMutable()) { - ((MutableConfigurationProvider) csConfProvider).mutateConfiguration( - user, confUpdate); + try { + ((MutableConfigurationProvider) csConfProvider).mutateConfiguration( + user, confUpdate); + } catch (Exception e) { + throw new IOException(e); + } } else { throw new UnsupportedOperationException("Configured CS configuration " + "provider does not support updating configuration."); @@ -2579,4 +2581,19 @@ public void updateConfiguration(UserGroupInformation user, public boolean isConfigurationMutable() { return csConfProvider instanceof MutableConfigurationProvider; } + + @Override + public void refreshConfiguration() throws Exception { + if (this.csConfProvider instanceof MutableConfigurationProvider) { + ((MutableConfigurationProvider) csConfProvider).recoverConf(); + } + } + + @Override + public MutableConfigurationProvider getMutableConfProvider() { + if (isConfigurationMutable()) { + return (MutableConfigurationProvider) csConfProvider; + } + return null; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java index c63734dfac8..d01355cd3eb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import java.util.LinkedList; import java.util.List; @@ -28,7 +30,7 @@ * A default implementation of {@link YarnConfigurationStore}. Doesn't offer * persistent configuration storage, just stores the configuration in memory. */ -public class InMemoryConfigurationStore implements YarnConfigurationStore { +public class InMemoryConfigurationStore extends YarnConfigurationStore { private Configuration schedConf; private LinkedList pendingMutations; @@ -87,4 +89,31 @@ public synchronized Configuration retrieve() { // Unimplemented. return null; } + + @Override + public void setResourceManager(ResourceManager rm) { + // Unused. + } + + @Override + public Version getConfStoreVersion() throws Exception { + // Does nothing. + return null; + } + + @Override + public void storeVersion() throws Exception { + // Does nothing. + } + + @Override + public Version getCurrentVersion() { + // Does nothing. + return null; + } + + @Override + public void checkVersion() { + // Does nothing. (Version is always compatible since it's in memory) + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java index 1280fab931b..1dacdade8a4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java @@ -26,6 +26,10 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.fusesource.leveldbjni.JniDBFactory; import org.fusesource.leveldbjni.internal.NativeDB; import org.iq80.leveldb.DB; @@ -55,7 +59,7 @@ /** * A LevelDB implementation of {@link YarnConfigurationStore}. */ -public class LeveldbConfigurationStore implements YarnConfigurationStore { +public class LeveldbConfigurationStore extends YarnConfigurationStore { public static final Log LOG = LogFactory.getLog(LeveldbConfigurationStore.class); @@ -63,6 +67,7 @@ private static final String DB_NAME = "yarn-conf-store"; private static final String LOG_PREFIX = "log."; private static final String LOG_COMMITTED_TXN = "committedTxn"; + private static final String VERSION_KEY = "version"; private DB db; // Txnid for the last transaction logged to the store. @@ -71,6 +76,8 @@ private long maxLogs; private Configuration conf; private LinkedList pendingMutations = new LinkedList<>(); + private static final Version CURRENT_VERSION_INFO = Version + .newInstance(0, 1); private Timer compactionTimer; private long compactionIntervalMsec; @@ -106,7 +113,7 @@ public void initialize(Configuration config, Configuration schedConf) } } this.maxLogs = config.getLong( - YarnConfiguration.RM_SCHEDCONF_LEVELDB_MAX_LOGS, + YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS); this.compactionIntervalMsec = config.getLong( YarnConfiguration.RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS, @@ -299,6 +306,44 @@ private void startCompactionTimer() { } } + @Override + public void setResourceManager(ResourceManager rm) { + // Unused. + } + + // TODO: following is taken from LeveldbRMStateStore + @Override + public Version getConfStoreVersion() throws Exception { + Version version = null; + try { + byte[] data = db.get(bytes(VERSION_KEY)); + if (data != null) { + version = new VersionPBImpl(YarnServerCommonProtos.VersionProto + .parseFrom(data)); + } + } catch (DBException e) { + throw new IOException(e); + } + return version; + } + + @Override + public void storeVersion() throws Exception { + String key = VERSION_KEY; + byte[] data = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto() + .toByteArray(); + try { + db.put(bytes(key), data); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public Version getCurrentVersion() { + return CURRENT_VERSION_INFO; + } + private class CompactionTimerTask extends TimerTask { @Override public void run() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java index d03b2e23d27..517d50e5279 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java @@ -76,6 +76,9 @@ public void init(Configuration config) throws IOException { case YarnConfiguration.LEVELDB_CONFIGURATION_STORE: this.confStore = new LeveldbConfigurationStore(); break; + case YarnConfiguration.ZK_CONFIGURATION_STORE: + this.confStore = new ZKConfigurationStore(); + break; default: this.confStore = YarnConfigurationStoreFactory.getStore(config); break; @@ -89,7 +92,12 @@ public void init(Configuration config) throws IOException { for (Map.Entry kv : initialSchedConf) { schedConf.set(kv.getKey(), kv.getValue()); } - confStore.initialize(config, schedConf); + confStore.setResourceManager(rmContext.getResourceManager()); + try { + confStore.initialize(config, schedConf); + } catch (Exception e) { + throw new IOException(e); + } // After initializing confStore, the store may already have an existing // configuration. Use this one. schedConf = confStore.retrieve(); @@ -99,6 +107,11 @@ public void init(Configuration config) throws IOException { } @Override + public YarnConfigurationStore getConfStore() { + return confStore; + } + + @Override public CapacitySchedulerConfiguration loadConfiguration(Configuration configuration) throws IOException { Configuration loadedConf = new Configuration(schedConf); @@ -108,7 +121,7 @@ public CapacitySchedulerConfiguration loadConfiguration(Configuration @Override public synchronized void mutateConfiguration(UserGroupInformation user, - SchedConfUpdateInfo confUpdate) throws IOException, YarnException { + SchedConfUpdateInfo confUpdate) throws Exception { if (!aclMutationPolicy.isMutationAllowed(user, confUpdate)) { throw new AccessControlException("User is not admin of all modified" + " queues."); @@ -135,8 +148,10 @@ public synchronized void mutateConfiguration(UserGroupInformation user, } @Override - public void recoverConf() throws IOException { + public synchronized void recoverConf() throws Exception { + confStore.checkVersion(); List uncommittedLogs = confStore.getPendingMutations(); + schedConf = confStore.retrieve(); Configuration oldConf = new Configuration(schedConf); for (LogMutation mutation : uncommittedLogs) { for (Map.Entry kv : mutation.getUpdates().entrySet()) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java index 065c877eeb7..5ef542da9e7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java @@ -18,7 +18,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateVersionIncompatibleException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import java.io.IOException; @@ -39,13 +44,15 @@ * {@code getPendingMutations}, and replay/confirm them via * {@code confirmMutation} as in the normal case. */ -public interface YarnConfigurationStore { +public abstract class YarnConfigurationStore { + public static final Log LOG = + LogFactory.getLog(YarnConfigurationStore.class); /** * LogMutation encapsulates the fields needed for configuration mutation * audit logging and recovery. */ - class LogMutation implements Serializable { + static class LogMutation implements Serializable { private Map updates; private String user; private long id; @@ -55,7 +62,7 @@ * @param updates key-value configuration updates * @param user user who requested configuration change */ - public LogMutation(Map updates, String user) { + LogMutation(Map updates, String user) { this(updates, user, 0); } @@ -110,8 +117,8 @@ public void setId(long id) { * @param schedConf Initial key-value configuration to persist * @throws IOException if initialization fails */ - void initialize(Configuration conf, Configuration schedConf) - throws IOException; + public abstract void initialize(Configuration conf, Configuration schedConf) + throws Exception; /** * Logs the configuration change to backing store. Generates an id associated @@ -120,7 +127,7 @@ void initialize(Configuration conf, Configuration schedConf) * @return id which configuration store associates with this mutation * @throws IOException if logging fails */ - long logMutation(LogMutation logMutation) throws IOException; + public abstract long logMutation(LogMutation logMutation) throws Exception; /** * Should be called after {@code logMutation}. Gets the pending mutation @@ -135,26 +142,63 @@ void initialize(Configuration conf, Configuration schedConf) * @param isValid if true, update persisted configuration with mutation * associated with {@code id}. * @return true on success - * @throws IOException if mutation confirmation fails + * @throws Exception if mutation confirmation fails */ - boolean confirmMutation(long id, boolean isValid) throws IOException; + public abstract boolean confirmMutation(long id, boolean isValid) + throws Exception; /** * Retrieve the persisted configuration. * @return configuration as key-value */ - Configuration retrieve(); + public abstract Configuration retrieve(); /** * Get the list of pending mutations, in the order they were logged. * @return list of mutations + * @throws Exception if fail to get pending mutations from store */ - List getPendingMutations(); + public abstract List getPendingMutations() throws Exception; /** * Get a list of confirmed configuration mutations starting from a given id. * @param fromId id from which to start getting mutations, inclusive * @return list of configuration mutations */ - List getConfirmedConfHistory(long fromId); + public abstract List getConfirmedConfHistory(long fromId); + + /** + * Set the store's associated ResourceManager. + * @param rm the resource manager + */ + public abstract void setResourceManager(ResourceManager rm); + + protected abstract Version getConfStoreVersion() throws Exception; + + protected abstract void storeVersion() throws Exception; + + protected abstract Version getCurrentVersion(); + + public void checkVersion() throws Exception { + // TODO this was taken from RMStateStore. Should probably refactor + Version loadedVersion = getConfStoreVersion(); + LOG.info("Loaded configuration store version info " + loadedVersion); + if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) { + return; + } + // if there is no version info, treat it as CURRENT_VERSION_INFO; + if (loadedVersion == null) { + loadedVersion = getCurrentVersion(); + } + if (loadedVersion.isCompatibleTo(getCurrentVersion())) { + LOG.info("Storing configuration store version info " + + getCurrentVersion()); + storeVersion(); + } else { + throw new RMStateVersionIncompatibleException( + "Expecting configuration store version " + getCurrentVersion() + + ", but loading version " + loadedVersion); + } + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java new file mode 100644 index 00000000000..78e1d779249 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.curator.ZKCuratorManager; +import org.apache.hadoop.util.curator.ZKCuratorManager.SafeTransaction; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.ACL; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +/** + * A Zookeeper-based implementation of {@link YarnConfigurationStore}. + */ +public class ZKConfigurationStore extends YarnConfigurationStore { + + public static final Log LOG = + LogFactory.getLog(ZKConfigurationStore.class); + + private long maxLogs; + + @VisibleForTesting + protected static final Version CURRENT_VERSION_INFO = Version + .newInstance(0, 1); + private Configuration conf; + + private ResourceManager rm; + + private String znodeParentPath; + + private static final String TXN_ID_PATH = "TXN_ID"; + private static final String ZK_VERSION_PATH = "VERSION"; + private static final String LOGS_PATH = "LOGS"; + private static final String CONF_STORE_PATH = "CONF_STORE"; + private static final String FENCING_PATH = "FENCING"; + + private String txnIdPath; + private String zkVersionPath; + private String logsPath; + private String confStorePath; + private String fencingNodePath; + + @VisibleForTesting + protected ZKCuratorManager zkManager; + private List zkAcl; + + private long txnId = 0; + + @Override + public void initialize(Configuration config, Configuration schedConf) + throws Exception { + this.conf = config; + this.maxLogs = conf.getLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, + YarnConfiguration.DEFAULT_RM_SCHEDCONF_ZK_MAX_LOGS); + this.znodeParentPath = + conf.get(YarnConfiguration.RM_SCHEDCONF_STORE_ZK_PARENT_PATH, + YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH); + this.zkManager = rm.getZKManager(); + if (zkManager == null) { + zkManager = rm.createAndStartZKManager(conf); + } + this.zkAcl = ZKCuratorManager.getZKAcls(conf); + + this.txnIdPath = getNodePath(znodeParentPath, TXN_ID_PATH); + this.zkVersionPath = getNodePath(znodeParentPath, ZK_VERSION_PATH); + this.logsPath = getNodePath(znodeParentPath, LOGS_PATH); + this.confStorePath = getNodePath(znodeParentPath, CONF_STORE_PATH); + this.fencingNodePath = getNodePath(znodeParentPath, FENCING_PATH); + + zkManager.createRootDirRecursively(znodeParentPath); + zkManager.delete(fencingNodePath); + + if (zkManager.exists(txnIdPath)) { + this.txnId = getConfirmedTxnId(); + } else { + zkManager.create(txnIdPath); + zkManager.setData(txnIdPath, String.valueOf(txnId), -1); + } + + if (!zkManager.exists(logsPath)) { + zkManager.create(logsPath); + zkManager.setData(logsPath, + serializeObject(new TreeMap()), -1); + } + + if (!zkManager.exists(confStorePath)) { + zkManager.create(confStorePath); + HashMap mapSchedConf = new HashMap<>(); + for (Map.Entry entry : schedConf) { + mapSchedConf.put(entry.getKey(), entry.getValue()); + } + zkManager.setData(confStorePath, serializeObject(mapSchedConf), -1); + } + } + + @VisibleForTesting + protected long getConfirmedTxnId() throws Exception { + return Long.parseLong(zkManager.getStringData(txnIdPath)); + } + + @VisibleForTesting + protected TreeMap getLogs() throws Exception { + return (TreeMap) + deserializeObject(zkManager.getData(logsPath)); + } + + // TODO: following version-related code is taken from ZKRMStateStore + @Override + public Version getCurrentVersion() { + return CURRENT_VERSION_INFO; + } + + @Override + public Version getConfStoreVersion() throws Exception { + if (zkManager.exists(zkVersionPath)) { + byte[] data = zkManager.getData(zkVersionPath); + return new VersionPBImpl(YarnServerCommonProtos.VersionProto + .parseFrom(data)); + } + + return null; + } + + @Override + public synchronized void storeVersion() throws Exception { + byte[] data = + ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); + + if (zkManager.exists(zkVersionPath)) { + zkManager.safeSetData(zkVersionPath, data, -1, zkAcl, fencingNodePath); + } else { + zkManager.safeCreate(zkVersionPath, data, zkAcl, CreateMode.PERSISTENT, + zkAcl, fencingNodePath); + } + } + + @Override + public synchronized long logMutation(LogMutation logMutation) + throws Exception { + byte[] storedLogs = zkManager.getData(logsPath); + TreeMap logs = new TreeMap<>(); + if (storedLogs != null) { + logs = (TreeMap) deserializeObject(storedLogs); + } + logs.put(++txnId, logMutation); + logMutation.setId(txnId); + if (logs.size() > maxLogs) { + logs.remove(logs.firstKey()); + } + zkManager.safeSetData(logsPath, serializeObject(logs), -1, zkAcl, + fencingNodePath); + return txnId; + } + + @Override + public synchronized boolean confirmMutation(long id, boolean isValid) + throws Exception { + SafeTransaction st = zkManager.createTransaction(zkAcl, fencingNodePath); + st.setData(txnIdPath, String.valueOf(id).getBytes(StandardCharsets.UTF_8), + -1); + if (isValid) { + byte[] mutationBytes = zkManager.getData(logsPath); + TreeMap mutations = + (TreeMap) deserializeObject(mutationBytes); + LogMutation mutation = mutations.get(id); + Configuration storedConfigs = retrieve(); + Map mapConf = new HashMap<>(); + for (Map.Entry storedConf : storedConfigs) { + mapConf.put(storedConf.getKey(), storedConf.getValue()); + } + for (Map.Entry confChange : + mutation.getUpdates().entrySet()) { + if (confChange.getValue() == null || confChange.getValue().isEmpty()) { + mapConf.remove(confChange.getKey()); + } else { + mapConf.put(confChange.getKey(), confChange.getValue()); + } + } + st.setData(confStorePath, serializeObject(mapConf), -1); + } + st.commit(); + return true; + } + + @Override + public synchronized Configuration retrieve() { + byte[] serializedSchedConf; + try { + serializedSchedConf = zkManager.getData(confStorePath); + } catch (Exception e) { + LOG.error("Failed to retrieve configuration from zookeeper store", e); + return null; + } + try { + Map map = + (HashMap) deserializeObject(serializedSchedConf); + Configuration c = new Configuration(); + for (Map.Entry e : map.entrySet()) { + c.set(e.getKey(), e.getValue()); + } + return c; + } catch (Exception e) { + LOG.error("Exception while deserializing scheduler configuration " + + "from store", e); + } + return null; + } + + @Override + public synchronized List getPendingMutations() throws Exception { + List pendingMutations = new ArrayList<>(); + TreeMap logs = getLogs(); + // Populate pendingMutations list for recovery + long confirmedTxnId = getConfirmedTxnId(); + Long startingRecoveryKey = logs.higherKey(confirmedTxnId); + if (startingRecoveryKey != null) { + for (long pendingTxnId = startingRecoveryKey; + pendingTxnId <= logs.lastKey(); ++pendingTxnId) { + // The txnIds should be contiguous, so this should never be null + pendingMutations.add(logs.get(pendingTxnId)); + } + } + return pendingMutations; + } + + @Override + public List getConfirmedConfHistory(long fromId) { + return null; // unimplemented + } + + @Override + public void setResourceManager(ResourceManager resourceManager) { + this.rm = resourceManager; + } + + private static String getNodePath(String root, String nodeName) { + return ZKCuratorManager.getNodePath(root, nodeName); + } + + private static byte[] serializeObject(Object o) throws Exception { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos);) { + oos.writeObject(o); + oos.flush(); + baos.flush(); + return baos.toByteArray(); + } + } + + private static Object deserializeObject(byte[] bytes) throws Exception { + try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + ObjectInputStream ois = new ObjectInputStream(bais);) { + return ois.readObject(); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index 516b629342f..64756ae1c0f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -2440,6 +2440,7 @@ public Void run() throws IOException, YarnException { } }); } catch (IOException e) { + LOG.error("Exception thrown when modifying configuration.", e); return Response.status(Status.BAD_REQUEST).entity(e.getMessage()) .build(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java new file mode 100644 index 00000000000..2fc74a2573b --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +/** + * Base class for {@link YarnConfigurationStore} implementations. + */ +public abstract class ConfigurationStoreBaseTest { + + protected YarnConfigurationStore confStore = createConfStore(); + + protected abstract YarnConfigurationStore createConfStore(); + + protected Configuration conf; + protected Configuration schedConf; + + protected static final String testUser = "testUser"; + + @Before + public void setUp() throws Exception { + this.conf = new Configuration(); + this.schedConf = new Configuration(false); + } + + @Test + public void testConfigurationUpdate() throws Exception { + schedConf.set("key1", "val1"); + confStore.initialize(conf, schedConf); + assertEquals("val1", confStore.retrieve().get("key1")); + + Map update1 = new HashMap<>(); + update1.put("keyUpdate1", "valUpdate1"); + YarnConfigurationStore.LogMutation mutation1 = + new YarnConfigurationStore.LogMutation(update1, testUser); + long id = confStore.logMutation(mutation1); + assertEquals(1, confStore.getPendingMutations().size()); + confStore.confirmMutation(id, true); + assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1")); + assertEquals(0, confStore.getPendingMutations().size()); + + Map update2 = new HashMap<>(); + update2.put("keyUpdate2", "valUpdate2"); + YarnConfigurationStore.LogMutation mutation2 = + new YarnConfigurationStore.LogMutation(update2, testUser); + id = confStore.logMutation(mutation2); + assertEquals(1, confStore.getPendingMutations().size()); + confStore.confirmMutation(id, false); + assertNull("Configuration should not be updated", + confStore.retrieve().get("keyUpdate2")); + assertEquals(0, confStore.getPendingMutations().size()); + } + + @Test + public void testNullConfigurationUpdate() throws Exception { + schedConf.set("key", "val"); + confStore.initialize(conf, schedConf); + assertEquals("val", confStore.retrieve().get("key")); + + Map update = new HashMap<>(); + update.put("key", null); + YarnConfigurationStore.LogMutation mutation = + new YarnConfigurationStore.LogMutation(update, testUser); + long id = confStore.logMutation(mutation); + confStore.confirmMutation(id, true); + assertNull(confStore.retrieve().get("key")); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java new file mode 100644 index 00000000000..c40d16a27cb --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +/** + * Tests {@link InMemoryConfigurationStore}. + */ +public class TestInMemoryConfigurationStore extends ConfigurationStoreBaseTest { + + @Override + protected YarnConfigurationStore createConfStore() { + return new InMemoryConfigurationStore(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java index 635a184e8ff..ba7bd5a9f9f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java @@ -20,7 +20,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; @@ -82,7 +81,7 @@ public void setUp() { } @Test - public void testInMemoryBackedProvider() throws IOException, YarnException { + public void testInMemoryBackedProvider() throws Exception { Configuration conf = new Configuration(); confProvider.init(conf); assertNull(confProvider.loadConfiguration(conf) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java deleted file mode 100644 index 631ce657e84..00000000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -public class TestYarnConfigurationStore { - - private YarnConfigurationStore confStore; - private Configuration schedConf; - - private static final String testUser = "testUser"; - - @Before - public void setUp() { - schedConf = new Configuration(false); - schedConf.set("key1", "val1"); - } - - @Test - public void testInMemoryConfigurationStore() throws IOException { - confStore = new InMemoryConfigurationStore(); - confStore.initialize(new Configuration(), schedConf); - assertEquals("val1", confStore.retrieve().get("key1")); - - Map update1 = new HashMap<>(); - update1.put("keyUpdate1", "valUpdate1"); - LogMutation mutation1 = new LogMutation(update1, testUser); - long id = confStore.logMutation(mutation1); - assertEquals(1, confStore.getPendingMutations().size()); - confStore.confirmMutation(id, true); - assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1")); - assertEquals(0, confStore.getPendingMutations().size()); - - Map update2 = new HashMap<>(); - update2.put("keyUpdate2", "valUpdate2"); - LogMutation mutation2 = new LogMutation(update2, testUser); - id = confStore.logMutation(mutation2); - assertEquals(1, confStore.getPendingMutations().size()); - confStore.confirmMutation(id, false); - assertNull("Configuration should not be updated", - confStore.retrieve().get("keyUpdate2")); - assertEquals(0, confStore.getPendingMutations().size()); - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java new file mode 100644 index 00000000000..f7aa9f29fd7 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java @@ -0,0 +1,401 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.apache.curator.test.TestingServer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests {@link ZKConfigurationStore}. + */ +public class TestZKConfigurationStore extends ConfigurationStoreBaseTest { + + public static final Log LOG = + LogFactory.getLog(TestZKConfigurationStore.class); + + private static final int ZK_TIMEOUT_MS = 10000; + private TestingServer curatorTestingServer; + private CuratorFramework curatorFramework; + private ResourceManager rm; + + public static TestingServer setupCuratorServer() throws Exception { + TestingServer curatorTestingServer = new TestingServer(); + curatorTestingServer.start(); + return curatorTestingServer; + } + + public static CuratorFramework setupCuratorFramework( + TestingServer curatorTestingServer) throws Exception { + CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() + .connectString(curatorTestingServer.getConnectString()) + .retryPolicy(new RetryNTimes(100, 100)) + .build(); + curatorFramework.start(); + return curatorFramework; + } + + @Before + public void setUp() throws Exception { + super.setUp(); + curatorTestingServer = setupCuratorServer(); + curatorFramework = setupCuratorFramework(curatorTestingServer); + + conf.set(CommonConfigurationKeys.ZK_ADDRESS, + curatorTestingServer.getConnectString()); + rm = new MockRM(conf); + rm.start(); + confStore.setResourceManager(rm); + } + + @After + public void cleanup() throws IOException { + rm.stop(); + curatorFramework.close(); + curatorTestingServer.stop(); + } + + @Override + public void testConfigurationUpdate() throws Exception { + super.testConfigurationUpdate(); + assertEquals(2, ((ZKConfigurationStore) confStore).getConfirmedTxnId()); + } + + @Test + public void testVersioning() throws Exception { + confStore.initialize(conf, schedConf); + assertNull(confStore.getConfStoreVersion()); + confStore.checkVersion(); + assertEquals(ZKConfigurationStore.CURRENT_VERSION_INFO, + confStore.getConfStoreVersion()); + } + + @Test + public void testPersistConfiguration() throws Exception { + schedConf.set("key", "val"); + confStore.initialize(conf, schedConf); + assertEquals("val", confStore.retrieve().get("key")); + + // Create a new configuration store, and check for old configuration + confStore = createConfStore(); + confStore.setResourceManager(rm); + schedConf.set("key", "badVal"); + // Should ignore passed-in scheduler configuration. + confStore.initialize(conf, schedConf); + assertEquals("val", confStore.retrieve().get("key")); + } + + + @Test + public void testPersistUpdatedConfiguration() throws Exception { + confStore.initialize(conf, schedConf); + assertNull(confStore.retrieve().get("key")); + + Map update = new HashMap<>(); + update.put("key", "val"); + YarnConfigurationStore.LogMutation mutation = + new YarnConfigurationStore.LogMutation(update, testUser); + long id = confStore.logMutation(mutation); + confStore.confirmMutation(id, true); + assertEquals("val", confStore.retrieve().get("key")); + + // Create a new configuration store, and check for updated configuration + confStore = createConfStore(); + confStore.setResourceManager(rm); + schedConf.set("key", "badVal"); + // Should ignore passed-in scheduler configuration. + confStore.initialize(conf, schedConf); + assertEquals("val", confStore.retrieve().get("key")); + } + + @Test + public void testMaxLogs() throws Exception { + conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2); + confStore.initialize(conf, schedConf); + TreeMap logs = + ((ZKConfigurationStore) confStore).getLogs(); + assertEquals(0, logs.size()); + + Map update1 = new HashMap<>(); + update1.put("key1", "val1"); + YarnConfigurationStore.LogMutation mutation = + new YarnConfigurationStore.LogMutation(update1, testUser); + long id = confStore.logMutation(mutation); + logs = ((ZKConfigurationStore) confStore).getLogs(); + assertEquals(1, logs.size()); + assertEquals("val1", logs.get(id).getUpdates().get("key1")); + confStore.confirmMutation(id, true); + assertEquals(1, logs.size()); + assertEquals("val1", logs.get(id).getUpdates().get("key1")); + + Map update2 = new HashMap<>(); + update2.put("key2", "val2"); + mutation = new YarnConfigurationStore.LogMutation(update2, testUser); + id = confStore.logMutation(mutation); + logs = ((ZKConfigurationStore) confStore).getLogs(); + assertEquals(2, logs.size()); + assertEquals("val2", logs.get(id).getUpdates().get("key2")); + confStore.confirmMutation(id, true); + assertEquals(2, logs.size()); + assertEquals("val2", logs.get(id).getUpdates().get("key2")); + + // Next update should purge first update from logs. + Map update3 = new HashMap<>(); + update3.put("key3", "val3"); + mutation = new YarnConfigurationStore.LogMutation(update3, testUser); + id = confStore.logMutation(mutation); + logs = ((ZKConfigurationStore) confStore).getLogs(); + assertEquals(2, logs.size()); + assertNull(logs.get(id - 2)); + assertEquals("val2", logs.get(id - 1).getUpdates().get("key2")); + assertEquals("val3", logs.get(id).getUpdates().get("key3")); + confStore.confirmMutation(id, true); + assertEquals(2, logs.size()); + assertNull(logs.get(id - 2)); + assertEquals("val2", logs.get(id - 1).getUpdates().get("key2")); + assertEquals("val3", logs.get(id).getUpdates().get("key3")); + } + + @Test + public void testConfigurationRecovery() throws Exception { + RMContext rmContext = mock(RMContext.class); + when(rmContext.getResourceManager()).thenReturn(rm); + when(rmContext.getScheduler()).thenReturn(rm.getResourceScheduler()); + MutableCSConfigurationProvider confProvider = + new MutableCSConfigurationProvider(rmContext); + conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS, + YarnConfiguration.ZK_CONFIGURATION_STORE); + confProvider.init(conf); + assertNull(confProvider.loadConfiguration(conf).get("key")); + assertNull(confProvider.getConfStore().retrieve().get("key")); + + Map update = new HashMap<>(); + update.put("key", "val"); + YarnConfigurationStore.LogMutation mutation = + new YarnConfigurationStore.LogMutation(update, testUser); + confProvider.getConfStore().logMutation(mutation); + + assertNull(confProvider.loadConfiguration(conf).get("key")); + assertNull(confProvider.getConfStore().retrieve().get("key")); + + confProvider.recoverConf(); + assertEquals("val", confProvider.loadConfiguration(conf).get("key")); + assertEquals("val", confProvider.getConfStore().retrieve().get("key")); + } + + public Configuration createRMHAConf(String rmIds, String rmId, + int adminPort) { + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.set(YarnConfiguration.RM_HA_IDS, rmIds); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.set(CapacitySchedulerConfiguration.CS_CONF_PROVIDER, + CapacitySchedulerConfiguration.STORE_CS_CONF_PROVIDER); + conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS, + YarnConfiguration.ZK_CONFIGURATION_STORE); + conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName()); + conf.set(YarnConfiguration.RM_ZK_ADDRESS, + curatorTestingServer.getConnectString()); + conf.set(YarnConfiguration.RM_HA_ID, rmId); + conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0"); + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + for (String rpcAddress : + YarnConfiguration.getServiceAddressConfKeys(conf)) { + for (String id : HAUtil.getRMHAIds(conf)) { + conf.set(HAUtil.addSuffix(rpcAddress, id), "localhost:0"); + } + } + conf.set(HAUtil.addSuffix(YarnConfiguration.RM_ADMIN_ADDRESS, rmId), + "localhost:" + adminPort); + return conf; + } + + /** + * When failing over, new active RM should read from current state of store, + * including any updates when the new active RM was in standby. + * @throws Exception + */ + @Test + public void testFailoverReadsFromUpdatedStore() throws Exception { + HAServiceProtocol.StateChangeRequestInfo req = + new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + + Configuration conf1 = createRMHAConf("rm1,rm2", "rm1", 1234); + ResourceManager rm1 = new MockRM(conf1); + rm1.start(); + rm1.getRMContext().getRMAdminService().transitionToActive(req); + assertEquals("RM with ZKStore didn't start", + Service.STATE.STARTED, rm1.getServiceState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm1.getRMContext().getRMAdminService().getServiceStatus().getState()); + assertNull(((MutableConfScheduler) rm1.getResourceScheduler()) + .getConfiguration().get("key")); + + Configuration conf2 = createRMHAConf("rm1,rm2", "rm2", 5678); + ResourceManager rm2 = new MockRM(conf2); + rm2.start(); + assertEquals("RM should be Standby", + HAServiceProtocol.HAServiceState.STANDBY, + rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); + + // Update configuration on RM1 + SchedConfUpdateInfo schedConfUpdateInfo = new SchedConfUpdateInfo(); + schedConfUpdateInfo.getGlobalParams().put("key", "val"); + ((MutableConfScheduler) rm1.getResourceScheduler()).updateConfiguration( + UserGroupInformation.createUserForTesting(testUser, new String[0]), + schedConfUpdateInfo); + assertEquals("val", ((MutableConfScheduler) rm1.getResourceScheduler()) + .getConfiguration().get("key")); + assertEquals("val", ((MutableConfScheduler) rm1.getResourceScheduler()) + .getMutableConfProvider().getConfStore().retrieve().get("key")); + + // Start RM2 and verifies it starts with updated configuration + rm2.getRMContext().getRMAdminService().transitionToActive(req); + assertEquals("RM with ZKStore didn't start", + Service.STATE.STARTED, rm2.getServiceState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); + + for (int i = 0; i < ZK_TIMEOUT_MS / 50; i++) { + if (HAServiceProtocol.HAServiceState.ACTIVE == + rm1.getRMContext().getRMAdminService().getServiceStatus() + .getState()) { + Thread.sleep(100); + } + } + assertEquals("RM should have been fenced", + HAServiceProtocol.HAServiceState.STANDBY, + rm1.getRMContext().getRMAdminService().getServiceStatus().getState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); + + assertEquals("val", ((MutableConfScheduler) rm2.getResourceScheduler()) + .getMutableConfProvider().getConfStore().retrieve().get("key")); + assertEquals("val", ((MutableConfScheduler) rm2.getResourceScheduler()) + .getConfiguration().get("key")); + rm1.close(); + rm2.close(); + } + + /** + * Test recover logged but not confirmed mutations on failover. + * @throws Exception + */ + @Test + public void testRecoverConfOnFailover() throws Exception { + HAServiceProtocol.StateChangeRequestInfo req = + new HAServiceProtocol.StateChangeRequestInfo(HAServiceProtocol + .RequestSource.REQUEST_BY_USER); + + Configuration conf1 = createRMHAConf("rm1,rm2", "rm1", 1234); + ResourceManager rm1 = new MockRM(conf1); + rm1.start(); + rm1.getRMContext().getRMAdminService().transitionToActive(req); + assertEquals("RM with ZKStore didn't start", + Service.STATE.STARTED, rm1.getServiceState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm1.getRMContext().getRMAdminService().getServiceStatus().getState()); + assertNull(((MutableConfScheduler) rm1.getResourceScheduler()) + .getConfiguration().get("key")); + + Configuration conf2 = createRMHAConf("rm1,rm2", "rm2", 5678); + ResourceManager rm2 = new MockRM(conf2); + rm2.start(); + assertEquals("RM should be Standby", + HAServiceProtocol.HAServiceState.STANDBY, + rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); + + // Log a configuration on active RM1 + Map update = new HashMap<>(); + update.put("key", "val"); + YarnConfigurationStore.LogMutation mutation = + new YarnConfigurationStore.LogMutation(update, testUser); + ((MutableConfScheduler) rm1.getResourceScheduler()) + .getMutableConfProvider().getConfStore().logMutation(mutation); + assertNull(((MutableConfScheduler) rm1.getResourceScheduler()) + .getConfiguration().get("key")); + + // Start RM2, and verifies the log gets replayed on recovery + rm2.getRMContext().getRMAdminService().transitionToActive(req); + assertEquals("RM with ZKStore didn't start", + Service.STATE.STARTED, rm2.getServiceState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); + + for (int i = 0; i < ZK_TIMEOUT_MS / 50; i++) { + if (HAServiceProtocol.HAServiceState.ACTIVE == + rm1.getRMContext().getRMAdminService().getServiceStatus() + .getState()) { + Thread.sleep(100); + } + } + assertEquals("RM should have been fenced", + HAServiceProtocol.HAServiceState.STANDBY, + rm1.getRMContext().getRMAdminService().getServiceStatus().getState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); + + assertEquals("val", ((MutableConfScheduler) rm2.getResourceScheduler()) + .getConfiguration().get("key")); + rm1.close(); + rm2.close(); + } + + @Override + public YarnConfigurationStore createConfStore() { + return new ZKConfigurationStore(); + } +}