diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 32665d7..c709d0c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -329,6 +329,15 @@ public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids"; public static final String RM_HA_ID = RM_HA_PREFIX + "id"; + /** Store the related configuration files in File System */ + public static final String RM_CONF_STORE = RM_PREFIX + "conf.store"; + public static final String DEFAULT_RM_CONF_STORE = "/confStore"; + + public static final String RM_REMOTE_CONFIGURATION = RM_PREFIX + + "remote-configuration.class"; + public static final String DEFAULT_RM_REMOTE_CONFIGURATION = + "org.apache.hadoop.yarn.FileSystemBasedRemoteConfiguration"; + @Private public static final List RM_SERVICES_ADDRESS_CONF_KEYS = Collections.unmodifiableList(Arrays.asList( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java index 8900b16..b6040fb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java @@ -23,6 +23,9 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.net.HttpURLConnection; import java.net.URL; @@ -30,6 +33,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.ha.ClientBaseWithFixes; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; @@ -41,7 +46,10 @@ import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer; import org.junit.After; import org.junit.Assert; @@ -98,6 +106,9 @@ public void setup() throws IOException { conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true); + FileSystem fs = FileSystem.get(conf); + Path workingPath = new Path(fs.getHomeDirectory(), "confStore"); + conf.set(YarnConfiguration.RM_CONF_STORE, workingPath.toString()); cluster = new MiniYARNCluster(TestRMFailover.class.getName(), 2, 1, 1, 1); } @@ -252,4 +263,63 @@ private void verifyExpectedException(String exceptionMessage){ .contains("Application with id '" + fakeAppId + "' " + "doesn't exist in RM.")); } + + @Test + public void testAdminServiceRefreshQueuesOnHA() throws IOException, + YarnException { + cluster.init(conf); + cluster.start(); + getAdminService(0).transitionToActive(req); + assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); + + // clean the remoteDirectory + getAdminService(0).getRemoteConfiguration().cleanRemoteDirectory(); + + CapacityScheduler cs = + (CapacityScheduler) cluster.getResourceManager(0).getRMContext() + .getScheduler(); + int maxAppsBefore = cs.getConfiguration().getMaximumSystemApplications(); + + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.set("yarn.scheduler.capacity.maximum-applications", "5000"); + String csConfFile = writeConfigurationXML(csConf, + "capacity-scheduler.xml"); + + // upload the file into Remote File System + getAdminService(0).getRemoteConfiguration().uploadToRemoteFileSystem( + new Path(csConfFile)); + + getAdminService(0).refreshQueues(RefreshQueuesRequest.newInstance()); + + int maxAppsAfter = cs.getConfiguration().getMaximumSystemApplications(); + Assert.assertEquals(maxAppsAfter, 5000); + Assert.assertTrue(maxAppsAfter != maxAppsBefore); + } + + private String writeConfigurationXML(Configuration conf, String confXMLName) + throws IOException { + DataOutputStream output = null; + try { + final File basedir = + new File("target", TestRMFailover.class.getName()); + final File tmpDir = new File(basedir, "tmpDir"); + tmpDir.mkdirs(); + final File confFile = new File(tmpDir, confXMLName); + if (confFile.exists()) { + confFile.delete(); + } + if (!confFile.createNewFile()) { + Assert.fail("Can not create " + confXMLName); + } + output = new DataOutputStream( + new FileOutputStream(confFile)); + conf.writeXml(output); + return confFile.getAbsolutePath(); + } finally { + if (output != null) { + output.close(); + } + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/FileSystemBasedRemoteConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/FileSystemBasedRemoteConfiguration.java new file mode 100644 index 0000000..22516de --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/FileSystemBasedRemoteConfiguration.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + + +public class FileSystemBasedRemoteConfiguration extends RemoteConfiguration{ + + private static final Log LOG = LogFactory + .getLog(FileSystemBasedRemoteConfiguration.class); + private FileSystem fs; + private Path remoteConfigDir; + + @Override + public Configuration getConfiguration(String name) throws IOException { + Path remoteConfigPath = new Path(this.remoteConfigDir, name); + if (!fs.exists(remoteConfigPath)) { + LOG.warn("Can not find Configuration: " + name + " in " + remoteConfigDir); + return null; + } + Configuration remoteConfig = new Configuration(false); + remoteConfig.addResource(fs.open(remoteConfigPath)); + return remoteConfig; + } + + @Override + public void initInternal(Configuration conf) throws Exception { + remoteConfigDir = + new Path(conf.get(YarnConfiguration.RM_CONF_STORE, + YarnConfiguration.DEFAULT_RM_CONF_STORE)); + fs = remoteConfigDir.getFileSystem(conf); + if (!fs.exists(remoteConfigDir)) { + fs.mkdirs(remoteConfigDir); + } + } + + @Override + public void closeInternal() throws Exception { + fs.close(); + } + + @Override + public void uploadToRemoteFileSystem(Path filePath) throws IOException { + fs.copyFromLocalFile(filePath, remoteConfigDir); + } + + @Override + public void cleanRemoteDirectory() throws IOException { + if (fs.exists(remoteConfigDir)) { + for (FileStatus file : fs.listStatus(remoteConfigDir)) { + fs.delete(file.getPath(), true); + } + } + } + + @Override + public void deleteFileFromRemoteDirectory(String fileName) throws IOException { + Path remoteFilePath = new Path(this.remoteConfigDir, fileName); + if (fs.exists(remoteFilePath)) { + fs.delete(remoteFilePath, false); + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/RemoteConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/RemoteConfiguration.java new file mode 100644 index 0000000..7ba2202 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/RemoteConfiguration.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + + +public abstract class RemoteConfiguration extends Configuration{ + + public void init(Configuration conf) throws Exception { + initInternal(conf); + } + + public void close() throws Exception { + closeInternal(); + } + + public abstract Configuration getConfiguration(String name) + throws IOException; + + public abstract void initInternal(Configuration conf) throws Exception; + + public abstract void closeInternal() throws Exception; + + public abstract void uploadToRemoteFileSystem(Path filePath) + throws IOException; + + public abstract void cleanRemoteDirectory() throws IOException; + + public abstract void deleteFileFromRemoteDirectory(String fileName) + throws IOException; +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/RemoteConfigurationFactory.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/RemoteConfigurationFactory.java new file mode 100644 index 0000000..e8afd7c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/RemoteConfigurationFactory.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + + +public class RemoteConfigurationFactory { + + @SuppressWarnings("unchecked") + public static RemoteConfiguration getRemoteConfiguration(Configuration conf) { + Class defaultProviderClass; + try { + defaultProviderClass = (Class) + Class.forName( + YarnConfiguration.DEFAULT_RM_REMOTE_CONFIGURATION); + } catch (Exception e) { + throw new YarnRuntimeException("Invalid default remote configuration " + + "class" + YarnConfiguration.DEFAULT_RM_REMOTE_CONFIGURATION, e); + } + RemoteConfiguration remoteConfiguration = + ReflectionUtils.newInstance( + conf.getClass(YarnConfiguration.RM_REMOTE_CONFIGURATION, + defaultProviderClass, RemoteConfiguration.class), conf); + return remoteConfiguration; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 971603a..5a2df15 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -45,6 +45,8 @@ import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.RemoteConfiguration; +import org.apache.hadoop.yarn.RemoteConfigurationFactory; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.conf.HAUtil; @@ -72,6 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.BlockingService; public class AdminService extends CompositeService implements @@ -89,6 +92,8 @@ private InetSocketAddress masterServiceAddress; private AccessControlList adminAcl; + private RemoteConfiguration remoteConfiguration = null; + private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -107,6 +112,9 @@ public synchronized void serviceInit(Configuration conf) throws Exception { addIfService(createEmbeddedElectorService()); } } + remoteConfiguration = + RemoteConfigurationFactory.getRemoteConfiguration(conf); + remoteConfiguration.init(conf); } masterServiceAddress = conf.getSocketAddr( @@ -129,6 +137,9 @@ protected synchronized void serviceStart() throws Exception { @Override protected synchronized void serviceStop() throws Exception { stopServer(); + if (remoteConfiguration != null) { + remoteConfiguration.close(); + } super.serviceStop(); } @@ -295,23 +306,32 @@ public synchronized HAServiceStatus getServiceStatus() throws IOException { @Override public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) throws YarnException, StandbyException { - UserGroupInformation user = checkAcls("refreshQueues"); + String argName = "refreshQueues"; + UserGroupInformation user = checkAcls(argName); if (!isRMActive()) { - RMAuditLogger.logFailure(user.getShortUserName(), "refreshQueues", + RMAuditLogger.logFailure(user.getShortUserName(), argName, adminAcl.toString(), "AdminService", "ResourceManager is not active. Can not refresh queues."); throwStandbyException(); } + RefreshQueuesResponse response = + recordFactory.newRecordInstance(RefreshQueuesResponse.class); try { - rmContext.getScheduler().reinitialize(getConfig(), this.rmContext); - RMAuditLogger.logSuccess(user.getShortUserName(), "refreshQueues", + Configuration conf = getConfiguration(argName); + if (this.rmContext.isHAEnabled() && conf == null) { + LOG.warn(printFailureDescription( + getConfigurationFileName(argName), argName)); + return response; + } + rmContext.getScheduler().reinitialize(conf, this.rmContext); + RMAuditLogger.logSuccess(user.getShortUserName(), argName, "AdminService"); - return recordFactory.newRecordInstance(RefreshQueuesResponse.class); + return response; } catch (IOException ioe) { LOG.info("Exception refreshing queues ", ioe); - RMAuditLogger.logFailure(user.getShortUserName(), "refreshQueues", + RMAuditLogger.logFailure(user.getShortUserName(), argName, adminAcl.toString(), "AdminService", "Exception refreshing queues"); throw RPCUtil.getRemoteException(ioe); @@ -484,4 +504,49 @@ public UpdateNodeResourceResponse updateNodeResource( return response; } + private String getConfigurationFileName(String name) + throws YarnException { + if (name.equalsIgnoreCase("refreshQueues")) { + String schedulerClassName = + getConfig().get(YarnConfiguration.RM_SCHEDULER, + YarnConfiguration.DEFAULT_RM_SCHEDULER); + if (schedulerClassName.contains("CapacityScheduler")) { + return "capacity-scheduler.xml"; + } else if (schedulerClassName.contains("FairScheduler")) { + return "fair-scheduler.xml"; + } + throw new YarnException("Invalid scheduler class: " + schedulerClassName); + } else if (name.equalsIgnoreCase("refreshSuperUserGroupsConfiguration") + || name.equalsIgnoreCase("refreshUserToGroupsMappings")) { + return "core-site.xml"; + } else if (name.equalsIgnoreCase("refreshAdminAcls")) { + return "yarn-site.xml"; + } else if (name.equalsIgnoreCase("refreshServiceAcls")) { + return "hadoop-policy.xml"; + } else { + throw new YarnException("Invalid refresh function: " + name); + } + } + + private String printFailureDescription(String name, String fName) { + String remoteDir = getConfig().get(YarnConfiguration.RM_CONF_STORE, + YarnConfiguration.DEFAULT_RM_CONF_STORE); + return "Fail to do " + fName + + ". To make it work on HA, you should upload " + name + + " into remote directory: " + remoteDir; + } + + private synchronized Configuration getConfiguration(String name) + throws YarnException, IOException { + if (!this.rmContext.isHAEnabled()) { + return new Configuration(); + } + return remoteConfiguration.getConfiguration(getConfigurationFileName( + name)); + } + + @VisibleForTesting + public synchronized RemoteConfiguration getRemoteConfiguration() { + return this.remoteConfiguration; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 0197c5b..d3513eb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -281,7 +281,11 @@ public Resource getClusterResources() { } else { CapacitySchedulerConfiguration oldConf = this.conf; - this.conf = new CapacitySchedulerConfiguration(conf); + if (this.rmContext.isHAEnabled()) { + this.conf = new CapacitySchedulerConfiguration(conf, true); + } else { + this.conf = new CapacitySchedulerConfiguration(conf); + } validateConf(this.conf); try { LOG.info("Re-initializing queues..."); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 6fceabf..b3b4980 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -143,7 +143,17 @@ public CapacitySchedulerConfiguration(Configuration configuration) { super(configuration); addResource(CS_CONFIGURATION_FILE); } - + + public CapacitySchedulerConfiguration(Configuration configuration, + boolean useRemoteConfiguration) { + super(configuration); + if (useRemoteConfiguration) { + reloadConfiguration(); + } else { + addResource(CS_CONFIGURATION_FILE); + } + } + private String getQueuePrefix(String queue) { String queueName = PREFIX + queue + DOT; return queueName; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 32665d7..c709d0c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -329,6 +329,15 @@ public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids"; public static final String RM_HA_ID = RM_HA_PREFIX + "id"; + /** Store the related configuration files in File System */ + public static final String RM_CONF_STORE = RM_PREFIX + "conf.store"; + public static final String DEFAULT_RM_CONF_STORE = "/confStore"; + + public static final String RM_REMOTE_CONFIGURATION = RM_PREFIX + + "remote-configuration.class"; + public static final String DEFAULT_RM_REMOTE_CONFIGURATION = + "org.apache.hadoop.yarn.FileSystemBasedRemoteConfiguration"; + @Private public static final List RM_SERVICES_ADDRESS_CONF_KEYS = Collections.unmodifiableList(Arrays.asList( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java index 8900b16..b6040fb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java @@ -23,6 +23,9 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.net.HttpURLConnection; import java.net.URL; @@ -30,6 +33,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.ha.ClientBaseWithFixes; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; @@ -41,7 +46,10 @@ import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer; import org.junit.After; import org.junit.Assert; @@ -98,6 +106,9 @@ public void setup() throws IOException { conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true); + FileSystem fs = FileSystem.get(conf); + Path workingPath = new Path(fs.getHomeDirectory(), "confStore"); + conf.set(YarnConfiguration.RM_CONF_STORE, workingPath.toString()); cluster = new MiniYARNCluster(TestRMFailover.class.getName(), 2, 1, 1, 1); } @@ -252,4 +263,63 @@ private void verifyExpectedException(String exceptionMessage){ .contains("Application with id '" + fakeAppId + "' " + "doesn't exist in RM.")); } + + @Test + public void testAdminServiceRefreshQueuesOnHA() throws IOException, + YarnException { + cluster.init(conf); + cluster.start(); + getAdminService(0).transitionToActive(req); + assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); + + // clean the remoteDirectory + getAdminService(0).getRemoteConfiguration().cleanRemoteDirectory(); + + CapacityScheduler cs = + (CapacityScheduler) cluster.getResourceManager(0).getRMContext() + .getScheduler(); + int maxAppsBefore = cs.getConfiguration().getMaximumSystemApplications(); + + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.set("yarn.scheduler.capacity.maximum-applications", "5000"); + String csConfFile = writeConfigurationXML(csConf, + "capacity-scheduler.xml"); + + // upload the file into Remote File System + getAdminService(0).getRemoteConfiguration().uploadToRemoteFileSystem( + new Path(csConfFile)); + + getAdminService(0).refreshQueues(RefreshQueuesRequest.newInstance()); + + int maxAppsAfter = cs.getConfiguration().getMaximumSystemApplications(); + Assert.assertEquals(maxAppsAfter, 5000); + Assert.assertTrue(maxAppsAfter != maxAppsBefore); + } + + private String writeConfigurationXML(Configuration conf, String confXMLName) + throws IOException { + DataOutputStream output = null; + try { + final File basedir = + new File("target", TestRMFailover.class.getName()); + final File tmpDir = new File(basedir, "tmpDir"); + tmpDir.mkdirs(); + final File confFile = new File(tmpDir, confXMLName); + if (confFile.exists()) { + confFile.delete(); + } + if (!confFile.createNewFile()) { + Assert.fail("Can not create " + confXMLName); + } + output = new DataOutputStream( + new FileOutputStream(confFile)); + conf.writeXml(output); + return confFile.getAbsolutePath(); + } finally { + if (output != null) { + output.close(); + } + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/FileSystemBasedRemoteConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/FileSystemBasedRemoteConfiguration.java new file mode 100644 index 0000000..5ab4284 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/FileSystemBasedRemoteConfiguration.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + + +public class FileSystemBasedRemoteConfiguration extends RemoteConfiguration{ + + private static final Log LOG = LogFactory + .getLog(FileSystemBasedRemoteConfiguration.class); + private FileSystem fs; + private Path remoteConfigDir; + + @Override + public synchronized Configuration getConfiguration(String name) throws IOException { + Path remoteConfigPath = new Path(this.remoteConfigDir, name); + if (!fs.exists(remoteConfigPath)) { + LOG.warn("Can not find Configuration: " + name + " in " + remoteConfigDir); + return null; + } + Configuration remoteConfig = new Configuration(false); + remoteConfig.addResource(fs.open(remoteConfigPath)); + return remoteConfig; + } + + @Override + public synchronized void initInternal(Configuration conf) throws Exception { + remoteConfigDir = + new Path(conf.get(YarnConfiguration.RM_CONF_STORE, + YarnConfiguration.DEFAULT_RM_CONF_STORE)); + fs = remoteConfigDir.getFileSystem(conf); + if (!fs.exists(remoteConfigDir)) { + fs.mkdirs(remoteConfigDir); + } + } + + @Override + public synchronized void closeInternal() throws Exception { + fs.close(); + } + + @Override + public synchronized void uploadToRemoteFileSystem(Path filePath) throws IOException { + fs.copyFromLocalFile(filePath, remoteConfigDir); + } + + @Override + public synchronized void cleanRemoteDirectory() throws IOException { + if (fs.exists(remoteConfigDir)) { + for (FileStatus file : fs.listStatus(remoteConfigDir)) { + fs.delete(file.getPath(), true); + } + } + } + + @Override + public synchronized void deleteFileFromRemoteDirectory(String fileName) throws IOException { + Path remoteFilePath = new Path(this.remoteConfigDir, fileName); + if (fs.exists(remoteFilePath)) { + fs.delete(remoteFilePath, false); + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/RemoteConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/RemoteConfiguration.java new file mode 100644 index 0000000..7ba2202 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/RemoteConfiguration.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + + +public abstract class RemoteConfiguration extends Configuration{ + + public void init(Configuration conf) throws Exception { + initInternal(conf); + } + + public void close() throws Exception { + closeInternal(); + } + + public abstract Configuration getConfiguration(String name) + throws IOException; + + public abstract void initInternal(Configuration conf) throws Exception; + + public abstract void closeInternal() throws Exception; + + public abstract void uploadToRemoteFileSystem(Path filePath) + throws IOException; + + public abstract void cleanRemoteDirectory() throws IOException; + + public abstract void deleteFileFromRemoteDirectory(String fileName) + throws IOException; +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/RemoteConfigurationFactory.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/RemoteConfigurationFactory.java new file mode 100644 index 0000000..e8afd7c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/RemoteConfigurationFactory.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + + +public class RemoteConfigurationFactory { + + @SuppressWarnings("unchecked") + public static RemoteConfiguration getRemoteConfiguration(Configuration conf) { + Class defaultProviderClass; + try { + defaultProviderClass = (Class) + Class.forName( + YarnConfiguration.DEFAULT_RM_REMOTE_CONFIGURATION); + } catch (Exception e) { + throw new YarnRuntimeException("Invalid default remote configuration " + + "class" + YarnConfiguration.DEFAULT_RM_REMOTE_CONFIGURATION, e); + } + RemoteConfiguration remoteConfiguration = + ReflectionUtils.newInstance( + conf.getClass(YarnConfiguration.RM_REMOTE_CONFIGURATION, + defaultProviderClass, RemoteConfiguration.class), conf); + return remoteConfiguration; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 971603a..5a2df15 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -45,6 +45,8 @@ import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.RemoteConfiguration; +import org.apache.hadoop.yarn.RemoteConfigurationFactory; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.conf.HAUtil; @@ -72,6 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.BlockingService; public class AdminService extends CompositeService implements @@ -89,6 +92,8 @@ private InetSocketAddress masterServiceAddress; private AccessControlList adminAcl; + private RemoteConfiguration remoteConfiguration = null; + private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -107,6 +112,9 @@ public synchronized void serviceInit(Configuration conf) throws Exception { addIfService(createEmbeddedElectorService()); } } + remoteConfiguration = + RemoteConfigurationFactory.getRemoteConfiguration(conf); + remoteConfiguration.init(conf); } masterServiceAddress = conf.getSocketAddr( @@ -129,6 +137,9 @@ protected synchronized void serviceStart() throws Exception { @Override protected synchronized void serviceStop() throws Exception { stopServer(); + if (remoteConfiguration != null) { + remoteConfiguration.close(); + } super.serviceStop(); } @@ -295,23 +306,32 @@ public synchronized HAServiceStatus getServiceStatus() throws IOException { @Override public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) throws YarnException, StandbyException { - UserGroupInformation user = checkAcls("refreshQueues"); + String argName = "refreshQueues"; + UserGroupInformation user = checkAcls(argName); if (!isRMActive()) { - RMAuditLogger.logFailure(user.getShortUserName(), "refreshQueues", + RMAuditLogger.logFailure(user.getShortUserName(), argName, adminAcl.toString(), "AdminService", "ResourceManager is not active. Can not refresh queues."); throwStandbyException(); } + RefreshQueuesResponse response = + recordFactory.newRecordInstance(RefreshQueuesResponse.class); try { - rmContext.getScheduler().reinitialize(getConfig(), this.rmContext); - RMAuditLogger.logSuccess(user.getShortUserName(), "refreshQueues", + Configuration conf = getConfiguration(argName); + if (this.rmContext.isHAEnabled() && conf == null) { + LOG.warn(printFailureDescription( + getConfigurationFileName(argName), argName)); + return response; + } + rmContext.getScheduler().reinitialize(conf, this.rmContext); + RMAuditLogger.logSuccess(user.getShortUserName(), argName, "AdminService"); - return recordFactory.newRecordInstance(RefreshQueuesResponse.class); + return response; } catch (IOException ioe) { LOG.info("Exception refreshing queues ", ioe); - RMAuditLogger.logFailure(user.getShortUserName(), "refreshQueues", + RMAuditLogger.logFailure(user.getShortUserName(), argName, adminAcl.toString(), "AdminService", "Exception refreshing queues"); throw RPCUtil.getRemoteException(ioe); @@ -484,4 +504,49 @@ public UpdateNodeResourceResponse updateNodeResource( return response; } + private String getConfigurationFileName(String name) + throws YarnException { + if (name.equalsIgnoreCase("refreshQueues")) { + String schedulerClassName = + getConfig().get(YarnConfiguration.RM_SCHEDULER, + YarnConfiguration.DEFAULT_RM_SCHEDULER); + if (schedulerClassName.contains("CapacityScheduler")) { + return "capacity-scheduler.xml"; + } else if (schedulerClassName.contains("FairScheduler")) { + return "fair-scheduler.xml"; + } + throw new YarnException("Invalid scheduler class: " + schedulerClassName); + } else if (name.equalsIgnoreCase("refreshSuperUserGroupsConfiguration") + || name.equalsIgnoreCase("refreshUserToGroupsMappings")) { + return "core-site.xml"; + } else if (name.equalsIgnoreCase("refreshAdminAcls")) { + return "yarn-site.xml"; + } else if (name.equalsIgnoreCase("refreshServiceAcls")) { + return "hadoop-policy.xml"; + } else { + throw new YarnException("Invalid refresh function: " + name); + } + } + + private String printFailureDescription(String name, String fName) { + String remoteDir = getConfig().get(YarnConfiguration.RM_CONF_STORE, + YarnConfiguration.DEFAULT_RM_CONF_STORE); + return "Fail to do " + fName + + ". To make it work on HA, you should upload " + name + + " into remote directory: " + remoteDir; + } + + private synchronized Configuration getConfiguration(String name) + throws YarnException, IOException { + if (!this.rmContext.isHAEnabled()) { + return new Configuration(); + } + return remoteConfiguration.getConfiguration(getConfigurationFileName( + name)); + } + + @VisibleForTesting + public synchronized RemoteConfiguration getRemoteConfiguration() { + return this.remoteConfiguration; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 0197c5b..d3513eb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -281,7 +281,11 @@ public Resource getClusterResources() { } else { CapacitySchedulerConfiguration oldConf = this.conf; - this.conf = new CapacitySchedulerConfiguration(conf); + if (this.rmContext.isHAEnabled()) { + this.conf = new CapacitySchedulerConfiguration(conf, true); + } else { + this.conf = new CapacitySchedulerConfiguration(conf); + } validateConf(this.conf); try { LOG.info("Re-initializing queues..."); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 6fceabf..b3b4980 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -143,7 +143,17 @@ public CapacitySchedulerConfiguration(Configuration configuration) { super(configuration); addResource(CS_CONFIGURATION_FILE); } - + + public CapacitySchedulerConfiguration(Configuration configuration, + boolean useRemoteConfiguration) { + super(configuration); + if (useRemoteConfiguration) { + reloadConfiguration(); + } else { + addResource(CS_CONFIGURATION_FILE); + } + } + private String getQueuePrefix(String queue) { String queueName = PREFIX + queue + DOT; return queueName;