From 4767cc4517dbb9858eb2d78c5ca07db5c51b974f Mon Sep 17 00:00:00 2001 From: Sunil G Date: Mon, 27 Feb 2017 11:55:31 +0530 Subject: [PATCH] YARN-6123 --- .../scheduler/capacity/ParentQueue.java | 3 ++ .../PriorityUtilizationQueueOrderingPolicy.java | 6 +++ .../TestCapacitySchedulerSurgicalPreemption.java | 4 +- .../scheduler/capacity/TestQueueParsing.java | 58 ++++++++++++++++++++++ 4 files changed, 69 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 75ab610..0e5b884 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -343,6 +343,9 @@ public void reinitialize(CSQueue newlyParsedQueue, // Re-sort all queues childQueues.clear(); childQueues.addAll(currentChildQueues.values()); + + // Make sure we notifies QueueOrderingPolicy + queueOrderingPolicy.setQueues(childQueues); } finally { writeLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java index fb25414..ada665d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; @@ -182,4 +183,9 @@ public String getConfigName() { return CapacitySchedulerConfiguration.QUEUE_UTILIZATION_ORDERING_POLICY; } } + + @VisibleForTesting + public List getQueues() { + return queues; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java index e64c4df..3f78013 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java @@ -728,12 +728,12 @@ public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer() // Call editSchedule again: selected containers are killed editPolicy.editSchedule(); + waitNumberOfLiveContainersFromApp(schedulerApp1, 4); - // Do allocation for all nms + // Make sure the container killed, then do allocation for all nms for (int i = 0; i < 4; i++) { cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); } - waitNumberOfLiveContainersFromApp(schedulerApp1, 4); waitNumberOfLiveContainersFromApp(schedulerApp1, 4); waitNumberOfLiveContainersFromApp(schedulerApp2, 1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java index 92baa85..5d167c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java @@ -19,6 +19,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,6 +34,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -936,4 +941,57 @@ public RMNodeLabelsManager createNodeLabelManager() { IOUtils.closeStream(rm); } } + + + @Test + public void testQueueOrderingPolicyUpdatedAfterReinitialize() + throws IOException { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + setupQueueConfigurationWithoutLabels(csConf); + YarnConfiguration conf = new YarnConfiguration(csConf); + + CapacityScheduler capacityScheduler = new CapacityScheduler(); + RMContextImpl rmContext = + new RMContextImpl(null, null, null, null, null, null, + new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null); + rmContext.setNodeLabelManager(nodeLabelManager); + capacityScheduler.setConf(conf); + capacityScheduler.setRMContext(rmContext); + capacityScheduler.init(conf); + capacityScheduler.start(); + + // Add a new b4 queue + csConf.setQueues(CapacitySchedulerConfiguration.ROOT + ".b", + new String[] { "b1", "b2", "b3", "b4" }); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".b.b4", 0f); + ParentQueue bQ = (ParentQueue) capacityScheduler.getQueue("b"); + checkEqualsToQueueSet(bQ.getChildQueues(), + new String[] { "b1", "b2", "b3" }); + capacityScheduler.reinitialize(new YarnConfiguration(csConf), rmContext); + + // Check child queue of b + checkEqualsToQueueSet(bQ.getChildQueues(), + new String[] { "b1", "b2", "b3", "b4" }); + + PriorityUtilizationQueueOrderingPolicy queueOrderingPolicy = + (PriorityUtilizationQueueOrderingPolicy) bQ.getQueueOrderingPolicy(); + checkEqualsToQueueSet(queueOrderingPolicy.getQueues(), + new String[] { "b1", "b2", "b3", "b4" }); + + ServiceOperations.stopQuietly(capacityScheduler); + } + + private void checkEqualsToQueueSet(List queues, String[] queueNames) { + Set existedQueues = new HashSet<>(); + for (CSQueue q : queues) { + existedQueues.add(q.getQueueName()); + } + for (String q : queueNames) { + Assert.assertTrue(existedQueues.remove(q)); + } + Assert.assertTrue(existedQueues.isEmpty()); + } } -- 2.10.1 (Apple Git-78)