diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 08b38a1707e..7aabb80d7ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -2185,6 +2185,18 @@ public void setAutoCreatedQueuesV2MaxChildQueuesLimit(String queuePath, public static final long DEFAULT_QUEUE_MANAGEMENT_MONITORING_INTERVAL = 1500L; + /** + * Time in milliseconds between invocations + * of QueueConfigurationAutoRefreshPolicy + */ + @Private + public static final String QUEUE_AUTO_REFRESH_MONITORING_INTERVAL = + PREFIX + "queue.auto.refresh.monitoring-interval"; + + @Private + public static final long DEFAULT_QUEUE_AUTO_REFRESH_MONITORING_INTERVAL = + 5000L; + /** * Queue Management computation policy for Auto Created queues * @param queue The queue's path diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueConfigurationAutoRefreshPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueConfigurationAutoRefreshPolicy.java new file mode 100644 index 00000000000..597e9e8a5f1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueConfigurationAutoRefreshPolicy.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; + +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; + +import java.io.IOException; + +/** + * Queue auto refresh policy for queues + */ +public class QueueConfigurationAutoRefreshPolicy implements SchedulingEditPolicy { + + private static final Logger LOG = + LoggerFactory.getLogger(QueueConfigurationAutoRefreshPolicy.class); + + private Clock clock; + + // Pointer to other RM components + private RMContext rmContext; + private ResourceCalculator rc; + private CapacityScheduler scheduler; + private RMNodeLabelsManager nlm; + + private long monitoringInterval; + + /** + * Instantiated by CapacitySchedulerConfiguration + */ + public QueueConfigurationAutoRefreshPolicy() { + clock = SystemClock.getInstance(); + } + + @Override + public void init(final Configuration config, final RMContext context, + final ResourceScheduler sched) { + LOG.info("Queue auto refresh Policy monitor: {}" + this. + getClass().getCanonicalName()); + assert null == scheduler : "Unexpected duplicate call to init"; + if (!(sched instanceof CapacityScheduler)) { + throw new YarnRuntimeException("Class " + + sched.getClass().getCanonicalName() + " not instance of " + + CapacityScheduler.class.getCanonicalName()); + } + rmContext = context; + scheduler = (CapacityScheduler) sched; + clock = scheduler.getClock(); + + rc = scheduler.getResourceCalculator(); + nlm = scheduler.getRMContext().getNodeLabelManager(); + + CapacitySchedulerConfiguration csConfig = scheduler.getConfiguration(); + + monitoringInterval = csConfig.getLong( + CapacitySchedulerConfiguration.QUEUE_AUTO_REFRESH_MONITORING_INTERVAL, + CapacitySchedulerConfiguration. + DEFAULT_QUEUE_AUTO_REFRESH_MONITORING_INTERVAL); + } + + + @Override + public void editSchedule() { + long startTs = clock.getTime(); + + try { + rmContext.getRMAdminService().refreshQueues(); + LOG.info("Auto refreshed queue successfully!"); + } catch (IOException | YarnException e) { + LOG.error("Can't refresh queue: " + e.getMessage()); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Total time used=" + (clock.getTime() - startTs) + " ms."); + } + } + + + @Override + public long getMonitoringInterval() { + return monitoringInterval; + } + + @Override + public String getPolicyName() { + return QueueConfigurationAutoRefreshPolicy.class.getCanonicalName(); + } + + public ResourceCalculator getResourceCalculator() { + return rc; + } + + public RMContext getRmContext() { + return rmContext; + } + + public CapacityScheduler getScheduler() { + return scheduler; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java index bc540b0ba7f..aadf80fbdb0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.HashSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; @@ -91,6 +92,8 @@ import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_PROXY_USER_PREFIX; import static org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration.NODES; import static org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration.PREFIX; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueConfigurationAutoRefreshPolicy; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -243,6 +246,70 @@ public void testAdminRefreshQueuesWithFileSystemBasedConfigurationProvider() Assert.assertTrue(maxAppsAfter != maxAppsBefore); } + @Test + public void testAutoRefreshQueuesWithFileSystemBasedConfigurationProvider() + throws IOException, YarnException { + configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, + "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); + + configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, + "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); + configuration.setBoolean( + YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + configuration.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, + QueueConfigurationAutoRefreshPolicy.class.getCanonicalName()); + + // Set auto refresh interval to 1s + configuration.setLong(CapacitySchedulerConfiguration.QUEUE_AUTO_REFRESH_MONITORING_INTERVAL, + 1000L); + + Set policies = new HashSet<>(); + policies.add( + QueueConfigurationAutoRefreshPolicy.class.getCanonicalName()); + + //upload default configurations + uploadDefaultConfiguration(); + + // upload the auto refresh related configurations + uploadConfiguration(configuration, "yarn-site.xml"); + uploadConfiguration(configuration, "capacity-scheduler.xml"); + + try { + rm = new MockRM(configuration); + rm.init(configuration); + rm.start(); + } catch(Exception ex) { + fail("Should not get any exceptions"); + } + CapacityScheduler cs = + (CapacityScheduler) rm.getRMContext().getScheduler(); + + Assert.assertTrue( + "No AutoCreatedQueueDeletionPolicy " + + "is present in running monitors", + cs.getSchedulingMonitorManager(). + isSameConfiguredPolicies(policies)); + + int maxAppsBefore = cs.getConfiguration().getMaximumSystemApplications(); + + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.set(CapacitySchedulerConfiguration.MAXIMUM_SYSTEM_APPLICATIONS, + "5000"); + uploadConfiguration(csConf, "capacity-scheduler.xml"); + + try { + Thread.sleep(3000); + } catch (Exception e) { + + } + + int maxAppsAfter = cs.getConfiguration().getMaximumSystemApplications(); + Assert.assertEquals(maxAppsAfter, 5000); + Assert.assertTrue(maxAppsAfter != maxAppsBefore); + } + + @Test public void testAdminRefreshQueuesWithMutableSchedulerConfiguration() { configuration.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,