diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index b23ec3e..04d9e1c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -537,6 +537,14 @@ private Resource assignContainer( } } + private boolean anyNodeLocalRequests(Priority priority) { + return anyLocalRequests(priority); + } + + private boolean anyLocalRequests(Priority priority) { + return getResourceRequests(priority).size() > 1; + } + private Resource assignContainer(FSSchedulerNode node, boolean reserved) { if (LOG.isDebugEnabled()) { LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved); @@ -565,16 +573,6 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { } } - ResourceRequest rackLocalRequest = getResourceRequest(priority, - node.getRackName()); - ResourceRequest localRequest = getResourceRequest(priority, - node.getNodeName()); - - if (localRequest != null && !localRequest.getRelaxLocality()) { - LOG.warn("Relax locality off is not supported on local request: " - + localRequest); - } - NodeType allowedLocality; if (scheduler.isContinuousSchedulingEnabled()) { allowedLocality = getAllowedLocalityLevelByTime(priority, @@ -588,33 +586,55 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { scheduler.getRackLocalityThreshold()); } - if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0 - && localRequest != null && localRequest.getNumContainers() != 0) { - return assignContainer(node, localRequest, - NodeType.NODE_LOCAL, reserved); - } + // Handle any node-local requests in the application + ResourceRequest localRequest = getResourceRequest(priority, + node.getNodeName()); + if (localRequest != null) { + if (!localRequest.getRelaxLocality()) { + LOG.warn("Relax locality off is not supported on local request: " + + localRequest); + } - if (rackLocalRequest != null && !rackLocalRequest.getRelaxLocality()) { - continue; + if (localRequest.getNumContainers() > 0) { + return assignContainer( + node, localRequest, NodeType.NODE_LOCAL, reserved); + } } - if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0 - && (allowedLocality.equals(NodeType.RACK_LOCAL) || - allowedLocality.equals(NodeType.OFF_SWITCH))) { - return assignContainer(node, rackLocalRequest, - NodeType.RACK_LOCAL, reserved); + // Handle any rack-local requests if (1) there are no node-local + // requests or (2) the allowed locality level allows rack-local requests + ResourceRequest rackLocalRequest = getResourceRequest(priority, + node.getRackName()); + if (rackLocalRequest != null) { + if (!rackLocalRequest.getRelaxLocality()) { + continue; + } + + if (rackLocalRequest.getNumContainers() > 0 && + (!anyNodeLocalRequests(priority) + || allowedLocality.equals(NodeType.RACK_LOCAL) + || allowedLocality.equals(NodeType.OFF_SWITCH))) { + return assignContainer(node, rackLocalRequest, + NodeType.RACK_LOCAL, reserved); + } } + // Handle any off-switch requests if (1) there are no node-local or + // rack-local requests or (2) the allowed locality level allows + // off-switch requests ResourceRequest offSwitchRequest = getResourceRequest(priority, ResourceRequest.ANY); - if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) { - continue; - } + if (offSwitchRequest != null) { + if (!offSwitchRequest.getRelaxLocality()) { + continue; + } - if (offSwitchRequest != null && offSwitchRequest.getNumContainers() != 0 - && allowedLocality.equals(NodeType.OFF_SWITCH)) { - return assignContainer(node, offSwitchRequest, - NodeType.OFF_SWITCH, reserved); + if (offSwitchRequest.getNumContainers() > 0 && + (!anyLocalRequests(priority) + || allowedLocality.equals(NodeType.OFF_SWITCH))) { + return assignContainer(node, offSwitchRequest, + NodeType.OFF_SWITCH, reserved); + } } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java new file mode 100644 index 0000000..c7a0e55 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.lib.MutableRate; +import org.apache.hadoop.metrics2.util.SampleStat; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event + .NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class TestContinuousScheduling extends FairSchedulerTestBase { + private MockClock mockClock; + + @Override + public Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + conf.setBoolean( + FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, true); + conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_NODE_MS, 100); + conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_RACK_MS, 100); + return conf; + } + + @Before + public void setup() { + mockClock = new MockClock(); + conf = createConfiguration(); + resourceManager = new MockRM(conf); + resourceManager.start(); + + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + scheduler.setClock(mockClock); + + assertTrue(scheduler.isContinuousSchedulingEnabled()); + assertEquals( + FairSchedulerConfiguration.DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS, + scheduler.getContinuousSchedulingSleepMs()); + assertEquals(mockClock, scheduler.getClock()); + } + + @After + public void teardown() { + if (resourceManager != null) { + resourceManager.stop(); + resourceManager = null; + } + } + + @Test (timeout = 60000) + public void testSchedulingDelay() throws InterruptedException { + // Add one node + String host = "127.0.0.1"; + RMNode node1 = MockNodes.newNodeInfo( + 1, Resources.createResource(4096, 4), 1, host); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + NodeUpdateSchedulerEvent nodeUpdateEvent = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(nodeUpdateEvent); + + // Create one application and submit one each of node-local, rack-local + // and ANY requests + ApplicationAttemptId appAttemptId = + createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); + scheduler.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", false); + scheduler.addApplicationAttempt(appAttemptId, false, false); + List ask = new ArrayList<>(); + ask.add(createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true)); + scheduler.allocate( + appAttemptId, ask, new ArrayList(), null, null); + FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); + + // Advance time and let continuous scheduling kick in + mockClock.tick(1); + while (1024 != app.getCurrentConsumption().getMemory()) { + Thread.sleep(100); + } + assertEquals(1024, app.getCurrentConsumption().getMemory()); + } +}