diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitorManager.java index 0cc700d9c11..6cdbcf035f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitorManager.java @@ -75,32 +75,10 @@ private void updateSchedulingMonitors(Configuration conf, // Add new monitor when needed for (String s : configurePoliciesSet) { if (!runningSchedulingMonitors.containsKey(s)) { - Class policyClass; - try { - policyClass = Class.forName(s); - } catch (ClassNotFoundException e) { - String message = "Failed to find class of specified policy=" + s; - LOG.warn(message); - throw new YarnException(message); - } - - if (SchedulingEditPolicy.class.isAssignableFrom(policyClass)) { - SchedulingEditPolicy policyInstance = - (SchedulingEditPolicy) ReflectionUtils.newInstance(policyClass, - null); - SchedulingMonitor mon = new SchedulingMonitor(rmContext, - policyInstance); - mon.init(conf); - if (startImmediately) { - mon.start(); - } - runningSchedulingMonitors.put(s, mon); - } else { - String message = - "Specified policy=" + s + " is not a SchedulingEditPolicy class."; - LOG.warn(message); - throw new YarnException(message); - } + SchedulingMonitor mon = initializeSchedulingMonitor(conf, s, + startImmediately); + + runningSchedulingMonitors.put(s, mon); } } @@ -115,6 +93,38 @@ private void updateSchedulingMonitors(Configuration conf, } } + public synchronized SchedulingMonitor initializeSchedulingMonitor + (Configuration conf, + String clazz, boolean + startImmediately) + throws YarnException { + Class policyClass; + try { + policyClass = Class.forName(clazz); + } catch (ClassNotFoundException e) { + String message = "Failed to find class of specified policy=" + clazz; + LOG.warn(message); + throw new YarnException(message); + } + + if (SchedulingEditPolicy.class.isAssignableFrom(policyClass)) { + SchedulingEditPolicy policyInstance = + (SchedulingEditPolicy) ReflectionUtils.newInstance(policyClass, null); + SchedulingMonitor mon = new SchedulingMonitor(rmContext, policyInstance); + mon.init(conf); + if (startImmediately) { + mon.start(); + } + + return mon; + } else{ + String message = + "Specified policy=" + clazz + " is not a SchedulingEditPolicy class."; + LOG.warn(message); + throw new YarnException(message); + } + } + public synchronized void initialize(RMContext rmContext, Configuration configuration) throws YarnException { this.rmContext = rmContext; @@ -181,4 +191,26 @@ public SchedulingMonitor getAvailableSchedulingMonitor() { public synchronized void stop() throws YarnException { stopAndRemoveAll(); } + + public synchronized void addSchedulingEditPolicy(Configuration conf, String + policyClazz, boolean startImmediately) throws YarnException { + if (runningSchedulingMonitors.get(policyClazz) == null) { + + SchedulingMonitor schedulingMonitor = initializeSchedulingMonitor(conf, + policyClazz, startImmediately); + + runningSchedulingMonitors.put(policyClazz, schedulingMonitor); + } else { + LOG.info("Scheduling Edit policy already exists. Skipping add : " + + runningSchedulingMonitors.get(policyClazz).getClass()); + } + } + + public boolean hasSchedulingEditPolicy(String policyClazz) { + if (runningSchedulingMonitors.get(policyClazz) == null) { + return false; + } + + return true; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 37f56deb119..3fbc2dd73e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -427,6 +427,10 @@ public void serviceInit(Configuration conf) throws Exception { initScheduler(configuration); // Initialize SchedulingMonitorManager schedulingMonitorManager.initialize(rmContext, conf); + + QueueManagementDynamicEditPolicy.configureSchedulingMonitor + (schedulingMonitorManager, getCapacitySchedulerQueueManager(), conf, + false); } @Override @@ -486,6 +490,19 @@ public void reinitialize(Configuration newConf, RMContext rmContext) offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit(); super.reinitialize(newConf, rmContext); + + try { + QueueManagementDynamicEditPolicy.configureSchedulingMonitor + (schedulingMonitorManager, getCapacitySchedulerQueueManager(), + newConf, + true); + } catch (YarnException e) { + throw new IOException("Failed to re-init queue management policy for " + + "auto-created dynamic queues" + + " : " + + e.getMessage(), + e); + } } finally { writeLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementDynamicEditPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementDynamicEditPolicy.java index 9b0cf7bc93b..7ccf48bb840 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementDynamicEditPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementDynamicEditPolicy.java @@ -26,6 +26,10 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; + +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.monitor + .SchedulingMonitorManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -269,4 +273,32 @@ public CapacityScheduler getScheduler() { public Set getManagedParentQueues() { return managedParentQueues; } + + public static void configureSchedulingMonitor(SchedulingMonitorManager + schedulingMonitorManager, CapacitySchedulerQueueManager + queueManager, Configuration + conf, + boolean startImmediately) throws YarnException { + + if (!schedulingMonitorManager.hasSchedulingEditPolicy( + QueueManagementDynamicEditPolicy.class.getName())) { + + boolean configureSchedulingEditPolicyForManagedQueues = false; + for (Map.Entry queues : queueManager + .getQueues().entrySet()) { + + CSQueue queue = queues.getValue(); + + if (queue instanceof ManagedParentQueue) { + configureSchedulingEditPolicyForManagedQueues = true; + break; + } + } + + if (configureSchedulingEditPolicyForManagedQueues) { + schedulingMonitorManager.addSchedulingEditPolicy(conf, + QueueManagementDynamicEditPolicy.class.getName(), startImmediately); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java index 0a530dd3c9b..997a25f5171 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java @@ -303,7 +303,7 @@ public void testConvertLeafQueueToParentQueueWithAutoCreate() new RMContextImpl(null, null, null, null, null, null, new RMContainerTokenSecretManager(newConf), new NMTokenSecretManagerInRM(newConf), - new ClientToAMTokenSecretManagerInRM(), null)); + new ClientToAMTokenSecretManagerInRM(), newCS)); } finally { newCS.stop(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueManagementDynamicEditPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueManagementDynamicEditPolicy.java index 66b488db275..1227fe2ec80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueManagementDynamicEditPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueManagementDynamicEditPolicy.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.monitor + .SchedulingMonitorManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity .queuemanagement.GuaranteedOrZeroCapacityOverTimePolicy; @@ -31,6 +34,8 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler .capacity.CSQueueUtils.EPSILON; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class TestQueueManagementDynamicEditPolicy extends TestCapacitySchedulerAutoCreatedQueueBase { @@ -103,8 +108,8 @@ public void testEditSchedule() throws Exception { validateCapacities((AutoCreatedLeafQueue) user3LeafQueue, 0.5f, 0.1f, 1.0f, 1.0f); - validateCapacitiesByLabel((ManagedParentQueue) parentQueue, (AutoCreatedLeafQueue) user3LeafQueue, - NODEL_LABEL_GPU); + validateCapacitiesByLabel((ManagedParentQueue) parentQueue, + (AutoCreatedLeafQueue) user3LeafQueue, NODEL_LABEL_GPU); } finally { cleanupQueue(USER1); @@ -126,4 +131,22 @@ private void waitForPolicyState(float expectedVal, } } } + + @Test + public void testSchedulingMonitorConfiguration() throws YarnException { + SchedulingMonitorManager schedulingMonitorManager = + new SchedulingMonitorManager(); + schedulingMonitorManager.initialize(cs.getRMContext(), cs + .getConfiguration()); + + assertFalse(schedulingMonitorManager.hasSchedulingEditPolicy + (QueueManagementDynamicEditPolicy.class.getName())); + + QueueManagementDynamicEditPolicy.configureSchedulingMonitor + (schedulingMonitorManager, cs.getCapacitySchedulerQueueManager(), cs + .getConfiguration(), false); + + assertTrue(schedulingMonitorManager.hasSchedulingEditPolicy + (QueueManagementDynamicEditPolicy.class.getName())); + } }