diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/FileSystemBasedConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/FileSystemBasedConfigurationProvider.java index 3532d13f4a9..156468e4f48 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/FileSystemBasedConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/FileSystemBasedConfigurationProvider.java @@ -92,4 +92,12 @@ public synchronized void initInternal(Configuration bootstrapConf) public synchronized void closeInternal() throws Exception { fs.close(); } + + public FileSystem getFs() { + return fs; + } + + public Path getConfigDir() { + return configDir; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 21d62480fe0..713fbc4149f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -3385,6 +3385,10 @@ public MutableConfigurationProvider getMutableConfProvider() { return null; } + public CSConfigurationProvider getCsConfProvider() { + return csConfProvider; + } + @Override public void resetSchedulerMetrics() { CapacitySchedulerMetrics.destroy(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 08b38a1707e..7aabb80d7ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -2185,6 +2185,18 @@ public void setAutoCreatedQueuesV2MaxChildQueuesLimit(String queuePath, public static final long DEFAULT_QUEUE_MANAGEMENT_MONITORING_INTERVAL = 1500L; + /** + * Time in milliseconds between invocations + * of QueueConfigurationAutoRefreshPolicy + */ + @Private + public static final String QUEUE_AUTO_REFRESH_MONITORING_INTERVAL = + PREFIX + "queue.auto.refresh.monitoring-interval"; + + @Private + public static final long DEFAULT_QUEUE_AUTO_REFRESH_MONITORING_INTERVAL = + 5000L; + /** * Queue Management computation policy for Auto Created queues * @param queue The queue's path diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueConfigurationAutoRefreshPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueConfigurationAutoRefreshPolicy.java new file mode 100644 index 00000000000..cf9df9a8b44 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueConfigurationAutoRefreshPolicy.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; + +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; + +import java.io.IOException; + + +/** + * Queue auto refresh policy for queues + */ +public class QueueConfigurationAutoRefreshPolicy implements SchedulingEditPolicy { + + private static final Logger LOG = + LoggerFactory.getLogger(QueueConfigurationAutoRefreshPolicy.class); + + private Clock clock; + + // Pointer to other RM components + private RMContext rmContext; + private ResourceCalculator rc; + private CapacityScheduler scheduler; + private RMNodeLabelsManager nlm; + + private long monitoringInterval; + private long lastModified; + + // Last time we successfully reloaded queues + private long lastSuccessfulReload; + private boolean lastReloadAttemptFailed = false; + + // Path to XML file containing allocations. + private Path allocCsFile; + private FileSystem fs; + + /** + * Instantiated by CapacitySchedulerConfiguration + */ + public QueueConfigurationAutoRefreshPolicy() { + clock = SystemClock.getInstance(); + } + + @Override + public void init(final Configuration config, final RMContext context, + final ResourceScheduler sched) { + LOG.info("Queue auto refresh Policy monitor: {}" + this. + getClass().getCanonicalName()); + assert null == scheduler : "Unexpected duplicate call to init"; + if (!(sched instanceof CapacityScheduler)) { + throw new YarnRuntimeException("Class " + + sched.getClass().getCanonicalName() + " not instance of " + + CapacityScheduler.class.getCanonicalName()); + } + rmContext = context; + scheduler = (CapacityScheduler) sched; + clock = scheduler.getClock(); + + rc = scheduler.getResourceCalculator(); + nlm = scheduler.getRMContext().getNodeLabelManager(); + + CapacitySchedulerConfiguration csConfig = scheduler.getConfiguration(); + + monitoringInterval = csConfig.getLong( + CapacitySchedulerConfiguration.QUEUE_AUTO_REFRESH_MONITORING_INTERVAL, + CapacitySchedulerConfiguration. + DEFAULT_QUEUE_AUTO_REFRESH_MONITORING_INTERVAL); + } + + + @Override + public void editSchedule() { + long startTs = clock.getTime(); + + try { + // Only handle the FileSystemBasedConfigurationProvider + // for auto refresh. + if (rmContext.getConfigurationProvider() instanceof FileSystemBasedConfigurationProvider) { + FileSystemBasedConfigurationProvider fsConfProvider = + ((FileSystemBasedConfigurationProvider) + rmContext.getConfigurationProvider()); + fs = fsConfProvider.getFs(); + + // Check if the cs related conf modified + allocCsFile = new Path(fsConfProvider.getConfigDir(), + YarnConfiguration.CS_CONFIGURATION_FILE); + lastModified = + fs.getFileStatus(allocCsFile).getModificationTime(); + + long time = clock.getTime(); + + if (lastModified > lastSuccessfulReload && + time > lastModified + monitoringInterval) { + try { + rmContext.getRMAdminService().refreshQueues(); + LOG.info("Auto refreshed queue successfully!"); + lastSuccessfulReload = clock.getTime(); + } catch (IOException | YarnException e) { + LOG.error("Can't refresh queue: " + e.getMessage()); + if (!lastReloadAttemptFailed) { + LOG.error("Failed to reload capacity scheduler config file - " + + "will use existing conf.", e); + } + lastReloadAttemptFailed = true; + } + + } else if (lastModified == 0l) { + if (!lastReloadAttemptFailed) { + LOG.warn("Failed to reload capacity scheduler config file because" + + " last modified returned 0. File exists: " + + fs.exists(allocCsFile)); + } + lastReloadAttemptFailed = true; + } + } + + } catch (IOException e) { + LOG.error("Can't get file status for refresh : " + e.getMessage()); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Total time used=" + (clock.getTime() - startTs) + " ms."); + } + } + + @VisibleForTesting + public long getLastSuccessfulReload() { + return lastSuccessfulReload; + } + + @VisibleForTesting + public long getLastModified() { + return lastModified; + } + + @VisibleForTesting + public boolean getLastReloadAttemptFailed() { + return lastReloadAttemptFailed; + } + + @Override + public long getMonitoringInterval() { + return monitoringInterval; + } + + @Override + public String getPolicyName() { + return QueueConfigurationAutoRefreshPolicy.class.getCanonicalName(); + } + + public FileSystem getFs() { + return fs; + } + + public Path getAllocCsFile() { + return allocCsFile; + } + + public ResourceCalculator getResourceCalculator() { + return rc; + } + + public RMContext getRmContext() { + return rmContext; + } + + public CapacityScheduler getScheduler() { + return scheduler; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java index bc540b0ba7f..1c33fdc494a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.HashSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; @@ -91,6 +92,8 @@ import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_PROXY_USER_PREFIX; import static org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration.NODES; import static org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration.PREFIX; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestQueueConfigurationAutoRefreshPolicy; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -243,6 +246,70 @@ public void testAdminRefreshQueuesWithFileSystemBasedConfigurationProvider() Assert.assertTrue(maxAppsAfter != maxAppsBefore); } + @Test + public void testAutoRefreshQueuesWithFileSystemBasedConfigurationProvider() + throws IOException, YarnException { + configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, + "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); + + configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, + "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); + configuration.setBoolean( + YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + configuration.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, + TestQueueConfigurationAutoRefreshPolicy.class.getCanonicalName()); + + // Set auto refresh interval to 1s + configuration.setLong(CapacitySchedulerConfiguration.QUEUE_AUTO_REFRESH_MONITORING_INTERVAL, + 1000L); + + Set policies = new HashSet<>(); + policies.add( + TestQueueConfigurationAutoRefreshPolicy.class.getCanonicalName()); + + //upload default configurations + uploadDefaultConfiguration(); + + // upload the auto refresh related configurations + uploadConfiguration(configuration, "yarn-site.xml"); + uploadConfiguration(configuration, "capacity-scheduler.xml"); + + try { + rm = new MockRM(configuration); + rm.init(configuration); + rm.start(); + } catch(Exception ex) { + fail("Should not get any exceptions"); + } + CapacityScheduler cs = + (CapacityScheduler) rm.getRMContext().getScheduler(); + + Assert.assertTrue( + "No AutoCreatedQueueDeletionPolicy " + + "is present in running monitors", + cs.getSchedulingMonitorManager(). + isSameConfiguredPolicies(policies)); + + int maxAppsBefore = cs.getConfiguration().getMaximumSystemApplications(); + + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.set(CapacitySchedulerConfiguration.MAXIMUM_SYSTEM_APPLICATIONS, + "5000"); + uploadConfiguration(csConf, "capacity-scheduler.xml"); + + try { + Thread.sleep(3000); + } catch (Exception e) { + // do nothing + } + + int maxAppsAfter = cs.getConfiguration().getMaximumSystemApplications(); + Assert.assertEquals(maxAppsAfter, 5000); + Assert.assertTrue(maxAppsAfter != maxAppsBefore); + } + + @Test public void testAdminRefreshQueuesWithMutableSchedulerConfiguration() { configuration.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueConfigurationAutoRefreshPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueConfigurationAutoRefreshPolicy.java new file mode 100644 index 00000000000..3b8a52b6378 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueConfigurationAutoRefreshPolicy.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; + +import static org.junit.Assert.fail; + +public class TestQueueConfigurationAutoRefreshPolicy { + + private Configuration configuration;; + private MockRM rm = null; + private FileSystem fs; + private Path workingPath; + private Path tmpDir; + private QueueConfigurationAutoRefreshPolicy policy; + + static { + YarnConfiguration.addDefaultResource( + YarnConfiguration.CS_CONFIGURATION_FILE); + YarnConfiguration.addDefaultResource( + YarnConfiguration.DR_CONFIGURATION_FILE); + } + + @Before + public void setup() throws IOException { + QueueMetrics.clearQueueMetrics(); + DefaultMetricsSystem.setMiniClusterMode(true); + + configuration = new YarnConfiguration(); + configuration.set(YarnConfiguration.RM_SCHEDULER, + CapacityScheduler.class.getCanonicalName()); + fs = FileSystem.get(configuration); + workingPath = + new Path(new File("target", this.getClass().getSimpleName() + + "-remoteDir").getAbsolutePath()); + configuration.set(YarnConfiguration.FS_BASED_RM_CONF_STORE, + workingPath.toString()); + tmpDir = new Path(new File("target", this.getClass().getSimpleName() + + "-tmpDir").getAbsolutePath()); + fs.delete(workingPath, true); + fs.delete(tmpDir, true); + fs.mkdirs(workingPath); + fs.mkdirs(tmpDir); + policy = + new QueueConfigurationAutoRefreshPolicy(); + } + + private String writeConfigurationXML(Configuration conf, String confXMLName) + throws IOException { + DataOutputStream output = null; + try { + final File confFile = new File(tmpDir.toString(), confXMLName); + if (confFile.exists()) { + confFile.delete(); + } + if (!confFile.createNewFile()) { + Assert.fail("Can not create " + confXMLName); + } + output = new DataOutputStream( + new FileOutputStream(confFile)); + conf.writeXml(output); + return confFile.getAbsolutePath(); + } finally { + if (output != null) { + output.close(); + } + } + } + + private void uploadConfiguration(Configuration conf, String confFileName) + throws IOException { + String csConfFile = writeConfigurationXML(conf, confFileName); + // upload the file into Remote File System + uploadToRemoteFileSystem(new Path(csConfFile)); + } + + private void uploadToRemoteFileSystem(Path filePath) + throws IOException { + fs.copyFromLocalFile(filePath, workingPath); + } + + private void uploadDefaultConfiguration() throws IOException { + Configuration conf = new Configuration(); + uploadConfiguration(conf, "core-site.xml"); + + YarnConfiguration yarnConf = new YarnConfiguration(); + yarnConf.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, + "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); + uploadConfiguration(yarnConf, "yarn-site.xml"); + + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + uploadConfiguration(csConf, "capacity-scheduler.xml"); + + Configuration hadoopPolicyConf = new Configuration(false); + hadoopPolicyConf + .addResource(YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE); + uploadConfiguration(hadoopPolicyConf, "hadoop-policy.xml"); + } + + @Test + public void testEditSchedule() throws Exception { + + configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, + "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); + + configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, + "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); + + // Set auto refresh interval to 1s + configuration.setLong(CapacitySchedulerConfiguration.QUEUE_AUTO_REFRESH_MONITORING_INTERVAL, + 1000L); + + //upload default configurations + uploadDefaultConfiguration(); + + // upload the auto refresh related configurations + uploadConfiguration(configuration, "yarn-site.xml"); + uploadConfiguration(configuration, "capacity-scheduler.xml"); + + try { + rm = new MockRM(configuration); + rm.init(configuration); + policy.init(configuration, + rm.getRMContext(), + rm.getResourceScheduler()); + rm.start(); + } catch(Exception ex) { + fail("Should not get any exceptions"); + } + CapacityScheduler cs = + (CapacityScheduler) rm.getRMContext().getScheduler(); + + int maxAppsBefore = cs.getConfiguration().getMaximumSystemApplications(); + + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.set(CapacitySchedulerConfiguration.MAXIMUM_SYSTEM_APPLICATIONS, + "5000"); + uploadConfiguration(csConf, "capacity-scheduler.xml"); + + // Trigger interval for refresh. + Thread.sleep(2000); + + policy.editSchedule(); + + Thread.sleep(1000); + + // Gap should be 2s, refresh after 2s sleep. + Assert.assertEquals(2, + policy.getLastSuccessfulReload() / 1000 - + policy.getLastModified() / 1000); + Assert.assertFalse(policy.getLastReloadAttemptFailed()); + long oldModified = policy.getLastModified(); + long oldSuccess = policy.getLastSuccessfulReload(); + + int maxAppsAfter = cs.getConfiguration().getMaximumSystemApplications(); + Assert.assertEquals(maxAppsAfter, 5000); + Assert.assertTrue(maxAppsAfter != maxAppsBefore); + + // Without modified + policy.editSchedule(); + Assert.assertEquals(oldModified, + policy.getLastModified()); + Assert.assertEquals(oldSuccess, + policy.getLastSuccessfulReload()); + + } + + @After + public void tearDown() throws IOException { + if (rm != null) { + rm.stop(); + } + fs.delete(workingPath, true); + fs.delete(tmpDir, true); + } +}