diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 75ab610..0e5b884 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -343,6 +343,9 @@ public void reinitialize(CSQueue newlyParsedQueue, // Re-sort all queues childQueues.clear(); childQueues.addAll(currentChildQueues.values()); + + // Make sure we notifies QueueOrderingPolicy + queueOrderingPolicy.setQueues(childQueues); } finally { writeLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java index fe60611..0544387 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; @@ -183,4 +184,9 @@ public String getConfigName() { return CapacitySchedulerConfiguration.QUEUE_UTILIZATION_ORDERING_POLICY; } } + + @VisibleForTesting + public List getQueues() { + return queues; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java index 5989da0..4a37bef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java @@ -728,12 +728,12 @@ public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer() // Call editSchedule again: selected containers are killed editPolicy.editSchedule(); + waitNumberOfLiveContainersFromApp(schedulerApp1, 4); - // Do allocation for all nms + // Make sure the container killed, then do allocation for all nms for (int i = 0; i < 4; i++) { cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); } - waitNumberOfLiveContainersFromApp(schedulerApp1, 4); waitNumberOfLiveContainersFromApp(schedulerApp1, 4); waitNumberOfLiveContainersFromApp(schedulerApp2, 1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java index 92baa85..3da3fde 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java @@ -19,6 +19,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,6 +34,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -936,4 +941,55 @@ public RMNodeLabelsManager createNodeLabelManager() { IOUtils.closeStream(rm); } } + + + @Test + public void testQueueOrderingPolicyUpdatedAfterReinitialize() + throws IOException { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + setupQueueConfigurationWithoutLabels(csConf); + YarnConfiguration conf = new YarnConfiguration(csConf); + + CapacityScheduler capacityScheduler = new CapacityScheduler(); + RMContextImpl rmContext = + new RMContextImpl(null, null, null, null, null, null, + new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null); + rmContext.setNodeLabelManager(nodeLabelManager); + capacityScheduler.setConf(conf); + capacityScheduler.setRMContext(rmContext); + capacityScheduler.init(conf); + capacityScheduler.start(); + + // Add a new b4 queue + csConf.setQueues(CapacitySchedulerConfiguration.ROOT + ".b", + new String[] { "b1", "b2", "b3", "b4" }); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".b.b4", 0f); + capacityScheduler.reinitialize(new YarnConfiguration(csConf), rmContext); + + // Check child queue of b + ParentQueue bQ = (ParentQueue) capacityScheduler.getQueue("b"); + Assert.assertEquals(4, bQ.getChildQueues().size()); + checkEqualsToQueueSet(bQ.getChildQueues(), + new String[] { "b1", "b2", "b3", "b4" }); + + PriorityUtilizationQueueOrderingPolicy queueOrderingPolicy = + (PriorityUtilizationQueueOrderingPolicy) bQ.getQueueOrderingPolicy(); + checkEqualsToQueueSet(queueOrderingPolicy.getQueues(), + new String[] { "b1", "b2", "b3", "b4" }); + + ServiceOperations.stopQuietly(capacityScheduler); + } + + private void checkEqualsToQueueSet(List queues, String[] queueNames) { + Set existedQueues = new HashSet<>(); + for (CSQueue q : queues) { + existedQueues.add(q.getQueueName()); + } + for (String q : queueNames) { + Assert.assertTrue(existedQueues.remove(q)); + } + } }