diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 0b6b8ef..c0114c0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -168,6 +168,15 @@ public void serviceInit(Configuration conf) throws Exception { return applications; } + public void getBlackListNodeIds(SchedulerApplicationAttempt app, + List blacklistNodeIdList) { + for (Map.Entry nodeEntry : nodes.entrySet()) { + if (SchedulerAppUtils.isBlacklisted(app, nodeEntry.getValue(), LOG)) { + blacklistNodeIdList.add(nodeEntry.getKey()); + } + } + } + @Override public Resource getClusterResource() { return clusterResource; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index e318d47..2c25eca 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -27,6 +27,7 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -75,7 +76,7 @@ boolean pending = true; // for app metrics private ResourceUsage appResourceUsage; - + AtomicBoolean userBlacklistChanged = new AtomicBoolean(false); public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, long epoch, ResourceUsage appResourceUsage) { @@ -226,8 +227,10 @@ synchronized public boolean updateResourceRequests( */ public void updateBlacklist( List blacklistAdditions, List blacklistRemovals) { - updateUserOrAMBlacklist(userBlacklist, blacklistAdditions, - blacklistRemovals); + if (updateUserOrAMBlacklist(userBlacklist, blacklistAdditions, + blacklistRemovals)) { + userBlacklistChanged.set(true); + } } /** @@ -241,17 +244,25 @@ public void updateAMBlacklist( blacklistRemovals); } - void updateUserOrAMBlacklist(Set blacklist, + boolean updateUserOrAMBlacklist(Set blacklist, List blacklistAdditions, List blacklistRemovals) { + boolean changed = false; synchronized (blacklist) { if (blacklistAdditions != null) { - blacklist.addAll(blacklistAdditions); + changed = blacklist.addAll(blacklistAdditions); } if (blacklistRemovals != null) { - blacklist.removeAll(blacklistRemovals); + if (blacklist.removeAll(blacklistRemovals)) { + changed = true; + } } } + return changed; + } + + public boolean getAndResetBlacklistChanged() { + return userBlacklistChanged.getAndSet(false); } synchronized public Collection getPriorities() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index cfec915..dac4213 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -20,6 +20,7 @@ import java.io.Serializable; import java.util.Arrays; +import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; @@ -52,6 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -78,6 +80,7 @@ private RMContainerComparator comparator = new RMContainerComparator(); private final Map preemptionMap = new HashMap(); + private List blacklistNodeIds = new ArrayList(); /** * Delay scheduling: We often want to prioritize scheduling of node-local * containers over rack-local or off-switch containers. To achieve this @@ -189,6 +192,24 @@ public Resource getHeadroom() { Resource clusterAvailableResources = Resources.subtract(clusterResource, clusterUsage); + if (appSchedulingInfo.getAndResetBlacklistChanged()) { + blacklistNodeIds.clear(); + scheduler.getBlackListNodeIds(this, blacklistNodeIds); + } + for (NodeId nodeId: blacklistNodeIds) { + SchedulerNode node = scheduler.getSchedulerNode(nodeId); + if (node != null) { + Resources.subtractFrom(clusterAvailableResources, + node.getAvailableResource()); + } + } + if (clusterAvailableResources.getMemory() < 0) { + clusterAvailableResources.setMemory(0); + } + if (clusterAvailableResources.getVirtualCores() < 0) { + clusterAvailableResources.setVirtualCores(0); + } + Resource queueMaxAvailableResources = Resources.subtract(queue.getMaxShare(), queueUsage); Resource maxAvailableResource = Resources.componentwiseMin( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java new file mode 100644 index 0000000..26153cd --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.doReturn; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; +import org.junit.Assert; +import org.junit.Test; + +public class TestAppSchedulingInfo { + + private static final Log LOG = LogFactory.getLog(TestAppSchedulingInfo.class); + + @Test + public void testBacklistChanged() { + ApplicationId appIdImpl = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appIdImpl, 1); + + FSLeafQueue queue = mock(FSLeafQueue.class); + doReturn("test").when(queue).getQueueName(); + AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo( + appAttemptId, "test", queue, null, 0, new ResourceUsage()); + + appSchedulingInfo.updateBlacklist(new ArrayList(), + new ArrayList()); + Assert.assertFalse(appSchedulingInfo.getAndResetBlacklistChanged()); + + ArrayList blacklistAdditions = new ArrayList(); + blacklistAdditions.add("node1"); + blacklistAdditions.add("node2"); + appSchedulingInfo.updateBlacklist(blacklistAdditions, + new ArrayList()); + Assert.assertTrue(appSchedulingInfo.getAndResetBlacklistChanged()); + + blacklistAdditions.clear(); + blacklistAdditions.add("node1"); + appSchedulingInfo.updateBlacklist(blacklistAdditions, + new ArrayList()); + Assert.assertFalse(appSchedulingInfo.getAndResetBlacklistChanged()); + + ArrayList blacklistRemovals = new ArrayList(); + blacklistRemovals.add("node1"); + appSchedulingInfo.updateBlacklist(new ArrayList(), + blacklistRemovals); + appSchedulingInfo.updateBlacklist(new ArrayList(), + blacklistRemovals); + Assert.assertTrue(appSchedulingInfo.getAndResetBlacklistChanged()); + + appSchedulingInfo.updateBlacklist(new ArrayList(), + blacklistRemovals); + Assert.assertFalse(appSchedulingInfo.getAndResetBlacklistChanged()); + } +}