From d559ad67877609dffcdb63c2205fa86ecf3bfb91 Mon Sep 17 00:00:00 2001 From: Prabhu Joseph Date: Thu, 28 May 2020 16:52:32 +0530 Subject: [PATCH] YARN-10293. Fix Reservation Issue with MultiNodePlacement. Signed-off-by: Prabhu Joseph --- .../resourcemanager/scheduler/SchedulerNode.java | 3 +- .../scheduler/capacity/CapacityScheduler.java | 28 +- .../capacity/TestCapacitySchedulerMultiNodes.java | 2 +- ...tCapacitySchedulerMultiNodesWithPreemption.java | 282 +++++++++++++++++++++ 4 files changed, 290 insertions(+), 25 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodesWithPreemption.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index ef03aad..a8ca09b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -324,7 +324,8 @@ public synchronized void containerStarted(ContainerId containerId) { * container. * @param resource Resources to add. */ - private synchronized void addUnallocatedResource(Resource resource) { + @VisibleForTesting + public synchronized void addUnallocatedResource(Resource resource) { if (resource == null) { LOG.error("Invalid resource addition of null resource for " + rmNode.getNodeAddress()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 5cef57a..a6aa824 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1721,31 +1721,13 @@ private CSAssignment allocateOrReserveNewContainers( */ private CSAssignment allocateContainersOnMultiNodes( CandidateNodeSet candidates) { - // When this time look at multiple nodes, try schedule if the - // partition has any available resource or killable resource - if (getRootQueue().getQueueCapacities().getUsedCapacity( - candidates.getPartition()) >= 1.0f - && preemptionManager.getKillableResource( - CapacitySchedulerConfiguration.ROOT, candidates.getPartition()) - == Resources.none()) { - // Try to allocate from reserved containers - for (FiCaSchedulerNode node : candidates.getAllNodes().values()) { - RMContainer reservedContainer = node.getReservedContainer(); - if (reservedContainer != null) { - allocateFromReservedContainer(node, false, reservedContainer); - } + // Try to allocate from reserved containers + for (FiCaSchedulerNode node : candidates.getAllNodes().values()) { + RMContainer reservedContainer = node.getReservedContainer(); + if (reservedContainer != null) { + allocateFromReservedContainer(node, false, reservedContainer); } - LOG.debug("This partition '{}' doesn't have available or " - + "killable resource", candidates.getPartition()); - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, null, - "", getRootQueue().getQueuePath(), ActivityState.REJECTED, - ActivityDiagnosticConstant. - INIT_CHECK_PARTITION_RESOURCE_INSUFFICIENT); - ActivitiesLogger.NODE - .finishSkippedNodeAllocation(activitiesManager, null); - return null; } - return allocateOrReserveNewContainers(candidates, false); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java index bb2cbfd..a57a8ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java @@ -258,7 +258,7 @@ public void testExcessReservationWillBeUnreserved() throws Exception { Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); Assert.assertEquals(1, schedulerApp1.getLiveContainers().size()); Assert.assertEquals(0, schedulerApp1.getReservedContainers().size()); - Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); + Assert.assertEquals(2, schedulerApp2.getLiveContainers().size()); // Trigger scheduling to allocate a container on nm1 for app2. cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodesWithPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodesWithPreemption.java new file mode 100644 index 0000000..09e3914 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodesWithPreemption.java @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData; +import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSorter; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicBoolean; + +public class TestCapacitySchedulerMultiNodesWithPreemption + extends CapacitySchedulerTestBase { + + private static final Log LOG = LogFactory + .getLog(TestCapacitySchedulerMultiNodesWithPreemption.class); + private CapacitySchedulerConfiguration conf; + private static final String POLICY_CLASS_NAME = + "org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement." + + "ResourceUsageMultiNodeLookupPolicy"; + + @Before + public void setUp() { + CapacitySchedulerConfiguration config = + new CapacitySchedulerConfiguration(); + config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, + DominantResourceCalculator.class.getName()); + conf = new CapacitySchedulerConfiguration(config); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES, + "resource-based"); + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME, + "resource-based"); + String policyName = + CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + + ".resource-based" + ".class"; + conf.set(policyName, POLICY_CLASS_NAME); + conf.setBoolean(CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED, + true); + // Set this to avoid the AM pending issue + conf.set(CapacitySchedulerConfiguration + .MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, "1"); + conf.setInt("yarn.scheduler.minimum-allocation-mb", 512); + conf.setInt("yarn.scheduler.minimum-allocation-vcores", 1); + conf.setInt("yarn.scheduler.maximum-allocation-mb", 102400); + + // Configure two queues to test Preemption + conf.set("yarn.scheduler.capacity.root.queues", "A, default"); + conf.set("yarn.scheduler.capacity.root.A.capacity", "50"); + conf.set("yarn.scheduler.capacity.root.default.capacity", "50"); + conf.set("yarn.scheduler.capacity.root.A.maximum-capacity", "100"); + conf.set("yarn.scheduler.capacity.root.default.maximum-capacity", "100"); + conf.set("yarn.scheduler.capacity.root.A.user-limit-factor", "10"); + conf.set("yarn.scheduler.capacity.root.default.user-limit-factor", "10"); + + // Configure Preemption + conf.setLong( + CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, 10000); + conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL, + 1500); + conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, + 1.0f); + conf.setFloat( + CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR, + 1.0f); + + conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, + ProportionalCapacityPreemptionPolicy.class.getCanonicalName()); + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + } + + @Test(timeout=60000) + public void testAllocationOfReservationFromOtherNode() throws Exception { + MockRM rm = new MockRM(conf); + rm.start(); + MockNM[] nms = new MockNM[3]; + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 1 * GB, 2); + nms[0] = nm1; + MockNM nm2 = rm.registerNode("127.0.0.2:1234", 2 * GB, 2); + nms[1] = nm2; + MockNM nm3 = rm.registerNode("127.0.0.3:1234", 3 * GB, 2); + nms[2] = nm3; + + MultiNodeSortingManager mns = rm.getRMContext() + .getMultiNodeSortingManager(); + MultiNodeSorter sorter = mns + .getMultiNodePolicy(POLICY_CLASS_NAME); + sorter.reSortClusterNodes(); + + // Step 1: Launch an App in Default Queue which utilizes the entire cluster + RMApp app1 = MockRMAppSubmitter.submit(rm, + MockRMAppSubmissionData.Builder.createWithMemory(3 * GB, rm) + .withAppName("app-1") + .withUser("user1") + .withAcls(null) + .withQueue("default") + .build()); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + am1.allocateAndWaitForContainers("*", 1, 2 * GB, nm2); + am1.allocateAndWaitForContainers("*", 1, 1 * GB, nm3); + + // Step 2: Wait till the cluster is full + for (int i = 0; i < nms.length; i++) { + while (true) { + SchedulerNodeReport reportNM = + rm.getResourceScheduler().getNodeReport(nms[i].getNodeId()); + if (reportNM.getAvailableResource().getMemorySize() == 0 * GB) { + break; + } + } + } + sorter.reSortClusterNodes(); + + // Step 3: Launch another App in Queue A which will be Reserved + // after Preemption + final AtomicBoolean result = new AtomicBoolean(false); + RMApp app2 = MockRMAppSubmitter.submit(rm, + MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm) + .withAppName("app-2") + .withUser("user2") + .withAcls(null) + .withQueue("A") + .build()); + Thread t1 = new Thread() { + public void run() { + try { + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1); + result.set(true); + } catch (Exception e) { + Assert.fail("Failed to launch app-2"); + } + } + }; + t1.start(); + Thread.sleep(1000); + + // Step 4: Wait for Preemption to happen. It will preempt Node1 (1GB) + // Get the node where preemption happened which has the available space + MockNM preemptedNode = null; + outer: while (true) { + for (int i = 0; i < nms.length; i++) { + SchedulerNodeReport reportNM = + rm.getResourceScheduler().getNodeReport(nms[i].getNodeId()); + if (reportNM.getAvailableResource().getMemorySize() == 1 * GB) { + preemptedNode = nms[i]; + break outer; + } + } + Thread.sleep(100); + } + LOG.info("Preempted node is: " + preemptedNode.getNodeId()); + + // Step 5: Don't release the container from NodeManager so that Reservation + // happens. Used Capacity will be < 1.0f but nodes won't have available + // containers so Reservation will happen. + FiCaSchedulerNode schedulerNode = + ((CapacityScheduler) rm.getResourceScheduler()) + .getNodeTracker().getNode(preemptedNode.getNodeId()); + Resource curResource = schedulerNode.getUnallocatedResource(); + schedulerNode.deductUnallocatedResource(Resource.newInstance(curResource)); + + ((CapacityScheduler) rm.getResourceScheduler()).getNodeTracker() + .removeNode(preemptedNode.getNodeId()); + ((CapacityScheduler) rm.getResourceScheduler()).getNodeTracker() + .addNode(schedulerNode); + + // Send a heartbeat to kick the tires on the Scheduler + // The container will be reserved for app-2 + RMNode preemptedRMNode = rm.getRMContext().getRMNodes().get( + preemptedNode.getNodeId()); + NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent( + preemptedRMNode); + rm.getResourceScheduler().handle(nodeUpdate); + + // Validate if Reservation happened + // Reservation will happen on last node in the iterator - Node3 + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + LeafQueue leafQueue = (LeafQueue) cs.getQueue("A"); + + FiCaSchedulerApp schedulerApp = getFiCaSchedulerApp(leafQueue, + app2.getApplicationId()); + assertEquals("App2 failed to get reserved container", 1, + schedulerApp.getReservedContainers().size()); + LOG.info("Reserved node is: " + + schedulerApp.getReservedContainers().get(0).getReservedNode()); + assertNotEquals("Failed to reserve as per the Multi Node Itearor", + schedulerApp.getReservedContainers().get(0).getReservedNode(), + preemptedNode.getNodeId()); + + + // Step 6: Okay, now preempted node is Node1 and reserved node is Node3 + // Validate if the Reserved Container gets allocated + // Free the space on Preempted Node1 and send heartbeat + schedulerNode = ((CapacityScheduler) rm.getResourceScheduler()) + .getNodeTracker().getNode(preemptedNode.getNodeId()); + curResource = schedulerNode.getAllocatedResource(); + schedulerNode.addUnallocatedResource(Resource.newInstance(curResource)); + ((CapacityScheduler) rm.getResourceScheduler()).getNodeTracker() + .removeNode(preemptedNode.getNodeId()); + ((CapacityScheduler) rm.getResourceScheduler()).getNodeTracker() + .addNode(schedulerNode); + + preemptedRMNode = rm.getRMContext().getRMNodes().get( + preemptedNode.getNodeId()); + nodeUpdate = new NodeUpdateSchedulerEvent(preemptedRMNode); + rm.getResourceScheduler().handle(nodeUpdate); + + // Step 7: Wait for app-2 to get ALLOCATED + while (!result.get()) { + Thread.sleep(100); + } + + // Step 8: Validate if app-2 has got 1 live container and + // released the reserved container + schedulerApp = getFiCaSchedulerApp(leafQueue, + app2.getApplicationId()); + assertEquals("App2 failed to get Allocated", 1, + schedulerApp.getLiveContainers().size()); + assertEquals("App2 failed to Unreserve", 0, + schedulerApp.getReservedContainers().size()); + + rm.stop(); + } + + private static FiCaSchedulerApp getFiCaSchedulerApp(LeafQueue leafQueue, + ApplicationId appId) { + Iterator apps = leafQueue.getApplications().iterator(); + FiCaSchedulerApp schedulerApp = null; + while (apps.hasNext()) { + FiCaSchedulerApp cur = apps.next(); + if (cur.getApplicationId().equals(appId)) { + schedulerApp = cur; + break; + } + } + assertNotNull("Failed to find application", schedulerApp); + return schedulerApp; + } +} \ No newline at end of file -- 2.7.4 (Apple Git-66)