commit 8ef196c1b946f421412117ced4ed1bd5e978b0fb Author: Wangda Tan Date: Thu Nov 3 17:28:01 2016 -0700 RMP-7974 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 5347074..6cdaeb8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -213,6 +213,15 @@ public void init(Configuration config, RMContext context, if (isIntraQueuePreemptionEnabled) { candidatesSelectionPolicies.add(new IntraQueueCandidatesSelector(this)); } + + // Do we need white list preemption policy? + boolean isWhiteListPreemptionEnabled = csConfig.getBoolean( + CapacitySchedulerConfiguration.WHITE_LIST_PREEMPTION_ENABLED, + CapacitySchedulerConfiguration.DEFAULT_WHITE_LIST_PREEMPTION_ENABLED); + if (isWhiteListPreemptionEnabled) { + candidatesSelectionPolicies.add( + new WhitelistQueueContainerCandidateSelector(this)); + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/WhitelistQueueContainerCandidateSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/WhitelistQueueContainerCandidateSelector.java new file mode 100644 index 0000000..f80d173 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/WhitelistQueueContainerCandidateSelector.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class WhitelistQueueContainerCandidateSelector + extends PreemptionCandidatesSelector { + private static final Log LOG = + LogFactory.getLog(ReservedContainerCandidatesSelector.class); + + // Queues which we allowed to preempt from others, and we will not preempt for + // queues in this list + private Set queueWhiteList; + + // Configured timeout before doing reserved container preemption + private long minTimeout; + + // All the reserved containers of the system (from queueWhiteList) + private List reservedContainers; + + private Clock clock = new SystemClock(); + + private static final Comparator + CONTAINER_CREATION_TIME_COMPARATOR = new Comparator() { + @Override + public int compare(RMContainer o1, RMContainer o2) { + return Long.compare(o1.getCreationTime(), o2.getCreationTime()); + } + }; + + WhitelistQueueContainerCandidateSelector( + CapacitySchedulerPreemptionContext preemptionContext) { + super(preemptionContext); + + // Initialize parameters + queueWhiteList = new HashSet<>(); + CapacitySchedulerConfiguration csc = + preemptionContext.getScheduler().getConfiguration(); + String whiteListQueues = csc.get( + CapacitySchedulerConfiguration.WHITE_LIST_PREEMPTION_QUEUES, + CapacitySchedulerConfiguration.DEFAULT_WHITE_LIST_PREEMPTION_QUEUES); + for (String q : whiteListQueues.split(",")) { + queueWhiteList.add(q.trim()); + LOG.info("Added queue=" + q + " to white-list"); + } + + minTimeout = csc.getLong( + CapacitySchedulerConfiguration.WHITE_LIST_PREEMPTION_TIMEOUT_MS, + CapacitySchedulerConfiguration.DEFAULT_WHITE_LIST_PREEMPTION_TIMEOUT_MS); + } + + @Override + public Map> selectCandidates( + Map> selectedCandidates, + Resource clusterResource, + Resource totalPreemptedResourceAllowed) { + if (queueWhiteList.isEmpty()) { + return selectedCandidates; + } + + reservedContainers = new ArrayList<>(); + + // Add all reserved containers for analysis + for (FiCaSchedulerNode node : preemptionContext.getScheduler() + .getAllNodes()) { + RMContainer reservedContainer = node.getReservedContainer(); + if (null != reservedContainer) { + if (queueWhiteList.contains(reservedContainer.getQueueName())) { + reservedContainers.add(reservedContainer); + } + } + } + + // Sort reserved container by creation time + Collections.sort(reservedContainers, CONTAINER_CREATION_TIME_COMPARATOR); + + long currentTime = System.currentTimeMillis(); + + // From the begining of the list + for (RMContainer reservedContainer : reservedContainers) { + if (currentTime - reservedContainer.getCreationTime() < minTimeout) { + break; + } + + FiCaSchedulerNode node = preemptionContext.getScheduler().getNode( + reservedContainer.getReservedNode()); + if (null == node) { + // Something is wrong, ignore + continue; + } + + // Need to preemption + // container_reserved_resource - (node.total - node.allocated) + Resource lacking = Resources.subtract( + reservedContainer.getReservedResource(), Resources + .subtract(node.getTotalResource(), node.getAllocatedResource())); + + // On each host, simply check if we could preempt containers from + // non-whitelist queues or not + List runningContainers = + node.getCopiedListOfRunningContainers(); + Collections.sort(runningContainers, CONTAINER_CREATION_TIME_COMPARATOR); + + // First of all, consider already selected containers + for (RMContainer runningContainer : runningContainers) { + if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected( + runningContainer, selectedCandidates)) { + Resources.subtractFrom(lacking, + runningContainer.getAllocatedResource()); + } + } + + // If we already can allocate the reserved container after preemption, + // skip following steps + if (Resources.fitsIn(rc, clusterResource, lacking, + Resources.none())) { + continue; + } + + Resource allowed = Resources.clone(totalPreemptedResourceAllowed); + Resource selected = Resources.createResource(0); + Set tmpSelected = new HashSet<>(); + + for (RMContainer runningContainer : runningContainers) { + // Only preempt resource from queues not in white-list + if (queueWhiteList.contains(runningContainer.getQueueName()) + || CapacitySchedulerPreemptionUtils.isContainerAlreadySelected( + runningContainer, selectedCandidates)) { + continue; + } + + // Not allow to preempt more than limit + if (Resources.greaterThanOrEqual(rc, clusterResource, allowed, + runningContainer.getAllocatedResource())) { + Resources.subtractFrom(allowed, + runningContainer.getAllocatedResource()); + Resources.subtractFrom(lacking, runningContainer.getAllocatedResource()); + Resources.addTo(selected, runningContainer.getAllocatedResource()); + tmpSelected.add(runningContainer); + } + + // Lacking <= 0 means we can allocate the reserved container + if (Resources.fitsIn(rc, clusterResource, lacking, Resources.none())) { + break; + } + } + + // Add selected container if we can allocate reserved container by + // preemption others + if (Resources.fitsIn(rc, clusterResource, lacking, Resources.none())) { + for (RMContainer c : tmpSelected) { + Set containers = selectedCandidates.get( + c.getApplicationAttemptId()); + if (null == containers) { + containers = new HashSet<>(); + selectedCandidates.put(c.getApplicationAttemptId(), containers); + } + containers.add(c); + } + } + } + + return selectedCandidates; + } + + @VisibleForTesting + public void setClock(Clock clock) { + this.clock = clock; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index cea5aa4..b23219a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1129,4 +1129,30 @@ public boolean getLazyPreemptionEnabled() { INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "max-allowable-limit"; public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT = 0.2f; + + /** + * Preemption for white list preemption + */ + @Private + private static final String WHITE_LIST_PREEMPTION_CONFIG_PREFIX = + PREEMPTION_CONFIG_PREFIX + "white-list-queue-preemption."; + + @Private + public static final String WHITE_LIST_PREEMPTION_ENABLED = + WHITE_LIST_PREEMPTION_CONFIG_PREFIX + "enabled"; + @Private + public static final boolean DEFAULT_WHITE_LIST_PREEMPTION_ENABLED = false; + + @Private + public static final String WHITE_LIST_PREEMPTION_QUEUES = + WHITE_LIST_PREEMPTION_CONFIG_PREFIX + "queues"; + @Private + public static final String DEFAULT_WHITE_LIST_PREEMPTION_QUEUES = ""; + + @Private + public static final String WHITE_LIST_PREEMPTION_TIMEOUT_MS = + WHITE_LIST_PREEMPTION_CONFIG_PREFIX + "timeout-ms"; + // 60 sec + @Private + public static final long DEFAULT_WHITE_LIST_PREEMPTION_TIMEOUT_MS = 60000L; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java index bd9f615..805f4a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java @@ -61,6 +61,7 @@ void setUp() throws Exception { conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class); conf = TestUtils.getConfigurationWithMultipleQueues(this.conf); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 100 * GB); // Set preemption related configurations conf.setInt(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java index db6115c..7b2b0c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java @@ -167,8 +167,7 @@ public void testSurgicalPreemptionWithAvailableResource() * * 1) Two nodes (n1/n2) in the cluster, each of them has 20G. * - * 2) app1 submit to queue-a first, it asked 38 * 1G containers - * We will allocate 20 on n1 and 19 on n2. + * 2) app1 submit to queue-b, asks for 1G * 5 * * 3) app2 submit to queue-c, ask for one 4G container (for AM) * @@ -243,4 +242,103 @@ public void testSurgicalPreemptionWithAvailableResource() rm1.close(); } + + @Test(timeout = 60000) + public void testSurgicalPreemptionWhenAllQueuesAreBelowGuaranteedCapacities() + throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + *
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * 
+ * + * 1) Two nodes (n1/n2) in the cluster, each of them has 20G. + * + * 2) app1 submit to queue-a first, it asked 38 * 1G containers + * We will allocate 20 on n1 and 19 on n2. + * + * 3) app2 submit to queue-c, ask for one 4G container (for AM) + * + * After preemption, we should expect: + * Preempt 3 containers from app1 and AM of app2 successfully allocated. + */ + conf.setBoolean( + CapacitySchedulerConfiguration.WHITE_LIST_PREEMPTION_ENABLED, true); + conf.set(CapacitySchedulerConfiguration.WHITE_LIST_PREEMPTION_QUEUES, "c"); + conf.setInt(CapacitySchedulerConfiguration.WHITE_LIST_PREEMPTION_TIMEOUT_MS, + 1000); + + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 20 * GB); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 6, new ArrayList<>()); + + // Do allocation for node1/node2 + for (int i = 0; i < 3; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // App1 should have 7 containers now, so the abs-used-cap of b is + // 7 / 40 = 17.5% < 20% (guaranteed) + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + // 4 from n1 and 3 from n2 + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()), + am1.getApplicationAttemptId(), 4); + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()), + am1.getApplicationAttemptId(), 3); + + // Submit app2 to queue-c and asks for a 1G container for AM + RMApp app2 = rm1.submitApp(18 * GB, "app", "user", null, "c"); + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + ApplicationAttemptId.newInstance(app2.getApplicationId(), 1)); + + while (cs.getNode(rmNode1.getNodeID()).getReservedContainer() == null) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + Thread.sleep(10); + } + + // Call editSchedule immediately: containers are not selected + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + editPolicy.editSchedule(); + Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); + + // Sleep the timeout interval, we should be able to see containers selected + Thread.sleep(1000); + editPolicy.editSchedule(); + Assert.assertEquals(2, editPolicy.getToPreemptContainers().size()); + + // Call editSchedule again: selected containers are killed, and new AM + // container launched + editPolicy.editSchedule(); + + // Do allocation till reserved container allocated + while (cs.getNode(rmNode1.getNodeID()).getReservedContainer() != null) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + Thread.sleep(10); + } + + waitNumberOfLiveContainersFromApp(schedulerApp2, 1); + + rm1.close(); + } }