From aa0bd83425fd72c796797a194844d00bd0693bb0 Mon Sep 17 00:00:00 2001 From: Prabhu Joseph Date: Fri, 30 Aug 2019 17:58:03 +0530 Subject: [PATCH] YARN-9788. Support Parallel Log Mutation in Queue Management API. --- .../scheduler/MutableConfigurationProvider.java | 5 +- .../conf/FSSchedulerConfigurationStore.java | 7 ++- .../capacity/conf/InMemoryConfigurationStore.java | 6 +-- .../capacity/conf/LeveldbConfigurationStore.java | 6 +-- .../conf/MutableCSConfigurationProvider.java | 8 +-- .../capacity/conf/YarnConfigurationStore.java | 5 +- .../capacity/conf/ZKConfigurationStore.java | 7 +-- .../resourcemanager/webapp/RMWebServices.java | 8 +-- .../capacity/conf/ConfigurationStoreBaseTest.java | 6 +-- .../conf/TestFSSchedulerConfigurationStore.java | 6 +-- .../conf/TestLeveldbConfigurationStore.java | 15 +++--- .../conf/TestMutableCSConfigurationProvider.java | 58 +++++++++++++++++----- .../capacity/conf/TestZKConfigurationStore.java | 26 +++++----- 13 files changed, 98 insertions(+), 65 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java index 6e56f3d..53afb24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java @@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; import java.io.IOException; @@ -48,7 +49,7 @@ * @param confUpdate User's requested configuration change * @throws Exception if logging the mutation fails */ - void logAndApplyMutation(UserGroupInformation user, SchedConfUpdateInfo + LogMutation logAndApplyMutation(UserGroupInformation user, SchedConfUpdateInfo confUpdate) throws Exception; /** @@ -57,7 +58,7 @@ void logAndApplyMutation(UserGroupInformation user, SchedConfUpdateInfo * properly. * @throws Exception if confirming mutation fails */ - void confirmPendingMutation(boolean isValid) throws Exception; + void confirmPendingMutation(LogMutation pendingMutation, boolean isValid) throws Exception; /** * Returns scheduler configuration cached in this provider. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java index 3ef97a0..961a1ff9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java @@ -57,7 +57,6 @@ private int maxVersion; private Path schedulerConfDir; private FileSystem fileSystem; - private LogMutation pendingMutation; private PathFilter configFilePathFilter; private volatile Configuration schedConf; private volatile Configuration oldConf; @@ -114,10 +113,9 @@ public boolean accept(Path path) { */ @Override public void logMutation(LogMutation logMutation) throws IOException { - pendingMutation = logMutation; LOG.info(new GsonBuilder().serializeNulls().create().toJson(logMutation)); oldConf = new Configuration(schedConf); - Map mutations = pendingMutation.getUpdates(); + Map mutations = logMutation.getUpdates(); for (Map.Entry kv : mutations.entrySet()) { if (kv.getValue() == null) { this.schedConf.unset(kv.getKey()); @@ -134,7 +132,8 @@ public void logMutation(LogMutation logMutation) throws IOException { * @throws Exception throw IOE when write temp configuration file fail */ @Override - public void confirmMutation(boolean isValid) throws Exception { + public void confirmMutation(LogMutation pendingMutation, + boolean isValid) throws Exception { if (pendingMutation == null || tempConfigPath == null) { LOG.warn("pendingMutation or tempConfigPath is null, do nothing"); return; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java index d69c236..3d24fdb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java @@ -32,7 +32,6 @@ public class InMemoryConfigurationStore extends YarnConfigurationStore { private Configuration schedConf; - private LogMutation pendingMutation; @Override public void initialize(Configuration conf, Configuration schedConf, @@ -42,11 +41,11 @@ public void initialize(Configuration conf, Configuration schedConf, @Override public void logMutation(LogMutation logMutation) { - pendingMutation = logMutation; + } @Override - public void confirmMutation(boolean isValid) { + public void confirmMutation(LogMutation pendingMutation, boolean isValid) { if (isValid) { for (Map.Entry kv : pendingMutation.getUpdates() .entrySet()) { @@ -57,7 +56,6 @@ public void confirmMutation(boolean isValid) { } } } - pendingMutation = null; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java index 4eb328c..8541027 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java @@ -72,7 +72,6 @@ private DB db; private long maxLogs; private Configuration conf; - private LogMutation pendingMutation; @VisibleForTesting protected static final Version CURRENT_VERSION_INFO = Version .newInstance(0, 1); @@ -191,11 +190,11 @@ public void logMutation(LogMutation logMutation) throws IOException { logs.removeFirst(); } db.put(bytes(LOG_KEY), serLogMutations(logs)); - pendingMutation = logMutation; } @Override - public void confirmMutation(boolean isValid) throws IOException { + public void confirmMutation(LogMutation pendingMutation, + boolean isValid) throws IOException { WriteBatch updateBatch = db.createWriteBatch(); if (isValid) { for (Map.Entry changes : @@ -208,7 +207,6 @@ public void confirmMutation(boolean isValid) throws IOException { } } db.write(updateBatch); - pendingMutation = null; } private byte[] serLogMutations(LinkedList mutations) throws diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java index 54000ad..30d2e7f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java @@ -136,7 +136,7 @@ public ConfigurationMutationACLPolicy getAclMutationPolicy() { } @Override - public void logAndApplyMutation(UserGroupInformation user, + public LogMutation logAndApplyMutation(UserGroupInformation user, SchedConfUpdateInfo confUpdate) throws Exception { oldConf = new Configuration(schedConf); Map kvUpdate = constructKeyValueConfUpdate(confUpdate); @@ -149,11 +149,13 @@ public void logAndApplyMutation(UserGroupInformation user, schedConf.set(kv.getKey(), kv.getValue()); } } + return log; } @Override - public void confirmPendingMutation(boolean isValid) throws Exception { - confStore.confirmMutation(isValid); + public void confirmPendingMutation(LogMutation pendingMutation, + boolean isValid) throws Exception { + confStore.confirmMutation(pendingMutation, isValid); if (!isValid) { schedConf = oldConf; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java index 2cc831f..153500b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java @@ -52,7 +52,7 @@ * LogMutation encapsulates the fields needed for configuration mutation * audit logging and recovery. */ - static class LogMutation implements Serializable { + public static class LogMutation implements Serializable { private Map updates; private String user; @@ -117,7 +117,8 @@ public void close() throws IOException {} * mutation. * @throws Exception if mutation confirmation fails */ - public abstract void confirmMutation(boolean isValid) throws Exception; + public abstract void confirmMutation(LogMutation pendingMutation, + boolean isValid) throws Exception; /** * Retrieve the persisted configuration. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java index 34c73ec..5d0159a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java @@ -54,7 +54,6 @@ protected static final Version CURRENT_VERSION_INFO = Version .newInstance(0, 1); private Configuration conf; - private LogMutation pendingMutation; private String znodeParentPath; @@ -158,12 +157,11 @@ public void logMutation(LogMutation logMutation) throws Exception { } zkManager.safeSetData(logsPath, serializeObject(logs), -1, zkAcl, fencingNodePath); - pendingMutation = logMutation; } @Override - public void confirmMutation(boolean isValid) - throws Exception { + public synchronized void confirmMutation(LogMutation pendingMutation, + boolean isValid) throws Exception { if (isValid) { Configuration storedConfigs = retrieve(); Map mapConf = new HashMap<>(); @@ -181,7 +179,6 @@ public void confirmMutation(boolean isValid) zkManager.safeSetData(confStorePath, serializeObject(mapConf), -1, zkAcl, fencingNodePath); } - pendingMutation = null; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index 762569f..956d89f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -147,6 +147,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; @@ -2504,14 +2505,15 @@ public Void run() throws Exception { throw new org.apache.hadoop.security.AccessControlException("User" + " is not admin of all modified queues."); } - provider.logAndApplyMutation(callerUGI, mutationInfo); + LogMutation logMutation = provider.logAndApplyMutation(callerUGI, + mutationInfo); try { rm.getRMContext().getRMAdminService().refreshQueues(); } catch (IOException | YarnException e) { - provider.confirmPendingMutation(false); + provider.confirmPendingMutation(logMutation, false); throw e; } - provider.confirmPendingMutation(true); + provider.confirmPendingMutation(logMutation, true); return null; } }); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java index 0f50b53..4b3153a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java @@ -64,7 +64,7 @@ public void testConfigurationUpdate() throws Exception { YarnConfigurationStore.LogMutation mutation1 = new YarnConfigurationStore.LogMutation(update1, TEST_USER); confStore.logMutation(mutation1); - confStore.confirmMutation(true); + confStore.confirmMutation(mutation1, true); assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1")); Map update2 = new HashMap<>(); @@ -72,7 +72,7 @@ public void testConfigurationUpdate() throws Exception { YarnConfigurationStore.LogMutation mutation2 = new YarnConfigurationStore.LogMutation(update2, TEST_USER); confStore.logMutation(mutation2); - confStore.confirmMutation(false); + confStore.confirmMutation(mutation2, false); assertNull("Configuration should not be updated", confStore.retrieve().get("keyUpdate2")); confStore.close(); @@ -89,7 +89,7 @@ public void testNullConfigurationUpdate() throws Exception { YarnConfigurationStore.LogMutation mutation = new YarnConfigurationStore.LogMutation(update, TEST_USER); confStore.logMutation(mutation); - confStore.confirmMutation(true); + confStore.confirmMutation(mutation, true); assertNull(confStore.retrieve().get("key")); confStore.close(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java index 65314be..f7415df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java @@ -96,7 +96,7 @@ public void confirmMutationWithValid() throws Exception { LogMutation logMutation = new LogMutation(updates, "test"); configurationStore.logMutation(logMutation); - configurationStore.confirmMutation(true); + configurationStore.confirmMutation(logMutation, true); storeConf = configurationStore.retrieve(); assertEquals(null, storeConf.get("a")); assertEquals("bb", storeConf.get("b")); @@ -106,7 +106,7 @@ public void confirmMutationWithValid() throws Exception { updates.put("b", "bbb"); configurationStore.logMutation(logMutation); - configurationStore.confirmMutation(true); + configurationStore.confirmMutation(logMutation, true); storeConf = configurationStore.retrieve(); assertEquals(null, storeConf.get("a")); assertEquals("bbb", storeConf.get("b")); @@ -129,7 +129,7 @@ public void confirmMutationWithInValid() throws Exception { LogMutation logMutation = new LogMutation(updates, "test"); configurationStore.logMutation(logMutation); - configurationStore.confirmMutation(false); + configurationStore.confirmMutation(logMutation, false); storeConf = configurationStore.retrieve(); compareConfig(conf, storeConf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java index 3381637..2a5166a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; import org.junit.Before; import org.junit.Test; @@ -99,7 +100,7 @@ public void testPersistUpdatedConfiguration() throws Exception { YarnConfigurationStore.LogMutation mutation = new YarnConfigurationStore.LogMutation(update, TEST_USER); confStore.logMutation(mutation); - confStore.confirmMutation(true); + confStore.confirmMutation(mutation, true); assertEquals("val", confStore.retrieve().get("key")); confStore.close(); @@ -128,7 +129,7 @@ public void testMaxLogs() throws Exception { logs = ((LeveldbConfigurationStore) confStore).getLogs(); assertEquals(1, logs.size()); assertEquals("val1", logs.get(0).getUpdates().get("key1")); - confStore.confirmMutation(true); + confStore.confirmMutation(mutation, true); assertEquals(1, logs.size()); assertEquals("val1", logs.get(0).getUpdates().get("key1")); @@ -140,7 +141,7 @@ public void testMaxLogs() throws Exception { assertEquals(2, logs.size()); assertEquals("val1", logs.get(0).getUpdates().get("key1")); assertEquals("val2", logs.get(1).getUpdates().get("key2")); - confStore.confirmMutation(true); + confStore.confirmMutation(mutation, true); assertEquals(2, logs.size()); assertEquals("val1", logs.get(0).getUpdates().get("key1")); assertEquals("val2", logs.get(1).getUpdates().get("key2")); @@ -154,7 +155,7 @@ public void testMaxLogs() throws Exception { assertEquals(2, logs.size()); assertEquals("val2", logs.get(0).getUpdates().get("key2")); assertEquals("val3", logs.get(1).getUpdates().get("key3")); - confStore.confirmMutation(true); + confStore.confirmMutation(mutation, true); assertEquals(2, logs.size()); assertEquals("val2", logs.get(0).getUpdates().get("key2")); assertEquals("val3", logs.get(1).getUpdates().get("key3")); @@ -180,16 +181,16 @@ public void testRestartReadsFromUpdatedStore() throws Exception { rm1.getResourceScheduler()).getMutableConfProvider(); UserGroupInformation user = UserGroupInformation .createUserForTesting(TEST_USER, new String[0]); - confProvider.logAndApplyMutation(user, schedConfUpdateInfo); + LogMutation log = confProvider.logAndApplyMutation(user, schedConfUpdateInfo); rm1.getResourceScheduler().reinitialize(conf, rm1.getRMContext()); assertEquals("val", ((MutableConfScheduler) rm1.getResourceScheduler()) .getConfiguration().get("key")); - confProvider.confirmPendingMutation(true); + confProvider.confirmPendingMutation(log, true); assertEquals("val", ((MutableCSConfigurationProvider) confProvider) .getConfStore().retrieve().get("key")); // Next update is not persisted, it should not be recovered schedConfUpdateInfo.getGlobalParams().put("key", "badVal"); - confProvider.logAndApplyMutation(user, schedConfUpdateInfo); + log = confProvider.logAndApplyMutation(user, schedConfUpdateInfo); rm1.close(); // Start RM2 and verifies it starts with updated configuration diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java index bb3cc60..722bd03 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo; import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; import org.junit.Before; @@ -93,15 +94,15 @@ public void testInMemoryBackedProvider() throws Exception { assertNull(confProvider.loadConfiguration(conf) .get("yarn.scheduler.capacity.root.a.goodKey")); - confProvider.logAndApplyMutation(TEST_USER, goodUpdate); - confProvider.confirmPendingMutation(true); + LogMutation log = confProvider.logAndApplyMutation(TEST_USER, goodUpdate); + confProvider.confirmPendingMutation(log, true); assertEquals("goodVal", confProvider.loadConfiguration(conf) .get("yarn.scheduler.capacity.root.a.goodKey")); assertNull(confProvider.loadConfiguration(conf).get( "yarn.scheduler.capacity.root.a.badKey")); - confProvider.logAndApplyMutation(TEST_USER, badUpdate); - confProvider.confirmPendingMutation(false); + log = confProvider.logAndApplyMutation(TEST_USER, badUpdate); + confProvider.confirmPendingMutation(log, false); assertNull(confProvider.loadConfiguration(conf).get( "yarn.scheduler.capacity.root.a.badKey")); } @@ -120,8 +121,8 @@ public void testRemoveQueueConfig() throws Exception { QueueConfigInfo("root.a", updateMap); updateInfo.getUpdateQueueInfo().add(queueConfigInfo); - confProvider.logAndApplyMutation(TEST_USER, updateInfo); - confProvider.confirmPendingMutation(true); + LogMutation log = confProvider.logAndApplyMutation(TEST_USER, updateInfo); + confProvider.confirmPendingMutation(log, true); assertEquals("testval", confProvider.loadConfiguration(conf) .get("yarn.scheduler.capacity.root.a.testkey")); @@ -130,14 +131,47 @@ public void testRemoveQueueConfig() throws Exception { queueConfigInfo = new QueueConfigInfo("root.a", updateMap); updateInfo.getUpdateQueueInfo().add(queueConfigInfo); - confProvider.logAndApplyMutation(TEST_USER, updateInfo); - confProvider.confirmPendingMutation(true); + log = confProvider.logAndApplyMutation(TEST_USER, updateInfo); + confProvider.confirmPendingMutation(log, true); assertNull("Failed to remove config", confProvider.loadConfiguration(conf). get("yarn.scheduler.capacity.root.a.testkey")); } @Test + public void testParallelUpdates() throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS, + YarnConfiguration.MEMORY_CONFIGURATION_STORE); + confProvider.init(conf); + + SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo(); + Map updateMap = new HashMap<>(); + updateMap.put("key1", "val1"); + QueueConfigInfo queueConfigInfo = new + QueueConfigInfo("root.a", updateMap); + updateInfo.getUpdateQueueInfo().add(queueConfigInfo); + LogMutation log1 = confProvider.logAndApplyMutation(TEST_USER, updateInfo); + + SchedConfUpdateInfo updateInfo1 = new SchedConfUpdateInfo(); + Map updateMap1 = new HashMap<>(); + updateMap1.put("key2", "val2"); + QueueConfigInfo queueConfigInfo1 = new + QueueConfigInfo("root.a", updateMap1); + updateInfo1.getUpdateQueueInfo().add(queueConfigInfo1); + LogMutation log2 = confProvider.logAndApplyMutation(TEST_USER, updateInfo1); + + confProvider.confirmPendingMutation(log1, true); + confProvider.confirmPendingMutation(log2, true); + + assertEquals("val1", confProvider.loadConfiguration(conf) + .get("yarn.scheduler.capacity.root.a.key1")); + assertEquals("val2", confProvider.loadConfiguration(conf) + .get("yarn.scheduler.capacity.root.a.key2")); + } + + + @Test public void testHDFSBackedProvider() throws Exception { File testSchedulerConfigurationDir = new File( TestMutableCSConfigurationProvider.class.getResource("").getPath() @@ -156,15 +190,15 @@ public void testHDFSBackedProvider() throws Exception { assertNull(confProvider.loadConfiguration(conf) .get("yarn.scheduler.capacity.root.a.goodKey")); - confProvider.logAndApplyMutation(TEST_USER, goodUpdate); - confProvider.confirmPendingMutation(true); + LogMutation log = confProvider.logAndApplyMutation(TEST_USER, goodUpdate); + confProvider.confirmPendingMutation(log, true); assertEquals("goodVal", confProvider.loadConfiguration(conf) .get("yarn.scheduler.capacity.root.a.goodKey")); assertNull(confProvider.loadConfiguration(conf).get( "yarn.scheduler.capacity.root.a.badKey")); - confProvider.logAndApplyMutation(TEST_USER, badUpdate); - confProvider.confirmPendingMutation(false); + log = confProvider.logAndApplyMutation(TEST_USER, badUpdate); + confProvider.confirmPendingMutation(log, false); assertNull(confProvider.loadConfiguration(conf).get( "yarn.scheduler.capacity.root.a.badKey")); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java index c09ae28..7bf2ff8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo; import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; import org.junit.After; @@ -135,10 +136,9 @@ public void testPersistUpdatedConfiguration() throws Exception { Map update = new HashMap<>(); update.put("key", "val"); - YarnConfigurationStore.LogMutation mutation = - new YarnConfigurationStore.LogMutation(update, TEST_USER); + LogMutation mutation = new LogMutation(update, TEST_USER); confStore.logMutation(mutation); - confStore.confirmMutation(true); + confStore.confirmMutation(mutation, true); assertEquals("val", confStore.retrieve().get("key")); // Create a new configuration store, and check for updated configuration @@ -165,7 +165,7 @@ public void testMaxLogs() throws Exception { logs = ((ZKConfigurationStore) confStore).getLogs(); assertEquals(1, logs.size()); assertEquals("val1", logs.get(0).getUpdates().get("key1")); - confStore.confirmMutation(true); + confStore.confirmMutation(mutation, true); assertEquals(1, logs.size()); assertEquals("val1", logs.get(0).getUpdates().get("key1")); @@ -177,7 +177,7 @@ public void testMaxLogs() throws Exception { assertEquals(2, logs.size()); assertEquals("val1", logs.get(0).getUpdates().get("key1")); assertEquals("val2", logs.get(1).getUpdates().get("key2")); - confStore.confirmMutation(true); + confStore.confirmMutation(mutation, true); assertEquals(2, logs.size()); assertEquals("val1", logs.get(0).getUpdates().get("key1")); assertEquals("val2", logs.get(1).getUpdates().get("key2")); @@ -191,7 +191,7 @@ public void testMaxLogs() throws Exception { assertEquals(2, logs.size()); assertEquals("val2", logs.get(0).getUpdates().get("key2")); assertEquals("val3", logs.get(1).getUpdates().get("key3")); - confStore.confirmMutation(true); + confStore.confirmMutation(mutation, true); assertEquals(2, logs.size()); assertEquals("val2", logs.get(0).getUpdates().get("key2")); assertEquals("val3", logs.get(1).getUpdates().get("key3")); @@ -261,16 +261,16 @@ public void testFailoverReadsFromUpdatedStore() throws Exception { rm1.getResourceScheduler()).getMutableConfProvider(); UserGroupInformation user = UserGroupInformation .createUserForTesting(TEST_USER, new String[0]); - confProvider.logAndApplyMutation(user, schedConfUpdateInfo); + LogMutation log = confProvider.logAndApplyMutation(user, schedConfUpdateInfo); rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext()); assertEquals("val", ((MutableConfScheduler) rm1.getResourceScheduler()) .getConfiguration().get("key")); - confProvider.confirmPendingMutation(true); + confProvider.confirmPendingMutation(log, true); assertEquals("val", ((MutableCSConfigurationProvider) confProvider) .getConfStore().retrieve().get("key")); // Next update is not persisted, it should not be recovered schedConfUpdateInfo.getGlobalParams().put("key", "badVal"); - confProvider.logAndApplyMutation(user, schedConfUpdateInfo); + log = confProvider.logAndApplyMutation(user, schedConfUpdateInfo); // Start RM2 and verifies it starts with updated configuration rm2.getRMContext().getRMAdminService().transitionToActive(req); @@ -353,9 +353,9 @@ public void testFailoverAfterRemoveQueue() throws Exception { stopParams.put("capacity", "0"); QueueConfigInfo stopInfo = new QueueConfigInfo("root.default", stopParams); schedConfUpdateInfo.getUpdateQueueInfo().add(stopInfo); - confProvider.logAndApplyMutation(user, schedConfUpdateInfo); + LogMutation log = confProvider.logAndApplyMutation(user, schedConfUpdateInfo); rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext()); - confProvider.confirmPendingMutation(true); + confProvider.confirmPendingMutation(log, true); assertTrue(Arrays.asList(((MutableConfScheduler) rm1.getResourceScheduler()) .getConfiguration().get("yarn.scheduler.capacity.root.queues").split (",")).contains("a")); @@ -364,9 +364,9 @@ public void testFailoverAfterRemoveQueue() throws Exception { schedConfUpdateInfo.getUpdateQueueInfo().clear(); schedConfUpdateInfo.getAddQueueInfo().clear(); schedConfUpdateInfo.getRemoveQueueInfo().add("root.default"); - confProvider.logAndApplyMutation(user, schedConfUpdateInfo); + log = confProvider.logAndApplyMutation(user, schedConfUpdateInfo); rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext()); - confProvider.confirmPendingMutation(true); + confProvider.confirmPendingMutation(log, true); assertEquals("a", ((MutableConfScheduler) rm1.getResourceScheduler()) .getConfiguration().get("yarn.scheduler.capacity.root.queues")); -- 2.7.4 (Apple Git-66)