diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index 14e2645..71bb2d2 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -882,6 +882,11 @@ public int getNumClusterNodes() { } @Override + public int getNumClusterHosts() { + return scheduler.getNumClusterHosts(); + } + + @Override public SchedulerNodeReport getNodeReport(NodeId nodeId) { return scheduler.getNodeReport(nodeId); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 6c438f2..2d386bf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1967,6 +1967,15 @@ public static boolean isDistributedNodeLabelConfiguration(Configuration conf) { NODELABEL_CONFIGURATION_TYPE, DEFAULT_NODELABEL_CONFIGURATION_TYPE)); } + public static final String AM_BLACKLISTING_ENABLED = + YARN_PREFIX + "am.blacklisting.enabled"; + public static final boolean DEFAULT_AM_BLACKLISTING_ENABLED = true; + + public static final String AM_BLACKLISTING_DISABLE_THRESHOLD = + YARN_PREFIX + "am.blacklisting.disable-failure-threshold"; + public static final float DEFAULT_AM_BLACKLISTING_DISABLE_THRESHOLD = 0.8f; + + public YarnConfiguration() { super(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 53face0..d5c4988 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2179,4 +2179,22 @@ 0 + + + Enable/disable blacklisting of hosts for AM based on AM failures on those + hosts. + + yarn.am.blacklisting.enabled + true + + + + + Threshold of ratio number of NodeManager hosts that are allowed to be + blacklisted for AM. Beyond this ratio there is no blacklisting to avoid + danger of blacklisting the entire cluster. + + yarn.am.blacklisting.disable-failure-threshold + 0.8f + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistManager.java new file mode 100644 index 0000000..7062669 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistManager.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.blacklist; + +import java.util.List; + +/** + * Tracks blacklists based on failures reported on nodes. + */ +public interface BlacklistManager { + + /** + * Report failure of a container on node. + * @param node that has a container failure + */ + void addNodeContainerFailure(String node); + + /** + * Get blacklist. + * @return blacklist. + */ + List getBlacklist(); + + /** + * Refresh the number of nodemanager hosts available for scheduling. + * @param nodeHostCount is the number of node hosts. + */ + void refreshNodeHostCount(int nodeHostCount); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/DisabledBlacklistManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/DisabledBlacklistManager.java new file mode 100644 index 0000000..d79d080 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/DisabledBlacklistManager.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.blacklist; + +import java.util.ArrayList; +import java.util.List; + +/** + * A {@link BlacklistManager} that returns no blacklists. + */ +public class DisabledBlacklistManager implements BlacklistManager{ + + private static final List noBlacklist = new ArrayList(); + + @Override + public void addNodeContainerFailure(String node) { + } + + @Override + public List getBlacklist() { + return noBlacklist; + } + + @Override + public void refreshNodeHostCount(int nodeHostCount) { + // Do nothing + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/SimpleBlacklistManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/SimpleBlacklistManager.java new file mode 100644 index 0000000..0a3dd7e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/SimpleBlacklistManager.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.blacklist; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Maintains a list of failed nodes and returns that as long as number of + * blacklisted nodes is below a threshold percentage of total nodes. If more + * than threshold number of nodes are marked as failure they all are returned + * as removal from blacklist so previous additions are reversed. + */ +public class SimpleBlacklistManager implements BlacklistManager { + + private int numberOfNodeManagerHosts; + private final double blacklistDisableFailureThreshold; + private final Set blacklistNodes = new HashSet<>(); + private static final ArrayList EMPTY_LIST = new ArrayList<>(); + + private static final Log LOG = LogFactory.getLog(SimpleBlacklistManager.class); + + public SimpleBlacklistManager(int numberOfNodeManagerHosts, + double blacklistDisableFailureThreshold) { + this.numberOfNodeManagerHosts = numberOfNodeManagerHosts; + this.blacklistDisableFailureThreshold = blacklistDisableFailureThreshold; + } + + @Override + public void addNodeContainerFailure(String node) { + blacklistNodes.add(node); + } + + @Override + public void refreshNodeHostCount(int nodeHostCount) { + this.numberOfNodeManagerHosts = nodeHostCount; + } + + @Override + public List getBlacklist() { + List ret; + List blacklist = new ArrayList<>(blacklistNodes); + final int currentBlacklistSize = blacklist.size(); + final double failureThreshold = this.blacklistDisableFailureThreshold * + numberOfNodeManagerHosts; + if (currentBlacklistSize < failureThreshold) { + if (LOG.isDebugEnabled()) { + LOG.debug("blacklist size " + currentBlacklistSize + " is less than " + + "failure threshold ratio " + blacklistDisableFailureThreshold + + " out of total usable nodes " + numberOfNodeManagerHosts); + } + ret = blacklist; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("blacklist size " + currentBlacklistSize + " is more than " + + "failure threshold ratio " + blacklistDisableFailureThreshold + + " out of total usable nodes " + numberOfNodeManagerHosts); + } + ret = EMPTY_LIST; + } + return ret; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 42ff1de..ededab5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -74,6 +74,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager; +import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager; +import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; @@ -133,6 +136,8 @@ private final Set applicationTags; private final long attemptFailuresValidityInterval; + private final boolean amBlacklistingEnabled; + private final float blacklistDisableThreshold; private Clock systemClock; @@ -447,7 +452,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, } this.logAggregationEnabled = conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, - YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); if (this.logAggregationEnabled) { this.logAggregationStatusForAppReport = LogAggregationStatus.NOT_START; } else { @@ -456,6 +461,14 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, maxLogAggregationDiagnosticsInMemory = conf.getInt( YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY, YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY); + + amBlacklistingEnabled = conf.getBoolean( + YarnConfiguration.AM_BLACKLISTING_ENABLED, + YarnConfiguration.DEFAULT_AM_BLACKLISTING_ENABLED); + + blacklistDisableThreshold = conf.getFloat( + YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD, + YarnConfiguration.DEFAULT_AM_BLACKLISTING_DISABLE_THRESHOLD); } @Override @@ -796,6 +809,18 @@ public void recover(RMState state) { private void createNewAttempt() { ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1); + + BlacklistManager currentAMBlacklist = null; + if (currentAttempt != null) { + currentAMBlacklist = currentAttempt.getAMBlacklist(); + } else { + if (amBlacklistingEnabled) { + currentAMBlacklist = new SimpleBlacklistManager( + scheduler.getNumClusterHosts(), blacklistDisableThreshold); + } else { + currentAMBlacklist = new DisabledBlacklistManager(); + } + } RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService, submissionContext, conf, @@ -803,7 +828,8 @@ private void createNewAttempt() { // previously failed attempts(which should not include Preempted, // hardware error and NM resync) + 1) equal to the max-attempt // limit. - maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq); + maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq, + currentAMBlacklist); attempts.put(appAttemptId, attempt); currentAttempt = attempt; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index b85174e..4dd8345 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; /** @@ -185,6 +186,12 @@ ApplicationResourceUsageReport getApplicationResourceUsageReport(); /** + * Get the {@link BlacklistManager} that manages blacklists for AM failures + * @return the {@link BlacklistManager} that tracks AM failures. + */ + BlacklistManager getAMBlacklist(); + + /** * the start time of the application. * @return the start time of the application. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 74a4000..67039b8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -36,7 +36,6 @@ import javax.crypto.SecretKey; -import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -71,6 +70,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; +import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager; +import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; @@ -182,6 +183,7 @@ private RMAppAttemptMetrics attemptMetrics = null; private ResourceRequest amReq = null; + private BlacklistManager blacklistedNodesForAM = null; private static final StateMachineFactory EMPTY_CONTAINER_REQUEST_LIST = new ArrayList(); + private static final List EMPTY_SYSTEM_BLACKLIST = new ArrayList<>(); + @VisibleForTesting public static final class ScheduleTransition implements @@ -939,7 +954,16 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY); appAttempt.amReq.setResourceName(ResourceRequest.ANY); appAttempt.amReq.setRelaxLocality(true); - + + appAttempt.getAMBlacklist().refreshNodeHostCount( + appAttempt.scheduler.getNumClusterHosts()); + + List blacklist = appAttempt.getAMBlacklist().getBlacklist(); + if (LOG.isDebugEnabled()) { + LOG.debug("Setting system Blacklist for launching AM " + blacklist); + } + appAttempt.scheduler.updateSystemBlacklist( + appAttempt.getAppAttemptId().getApplicationId(), blacklist); // AM resource has been checked when submission Allocation amContainerAllocation = appAttempt.scheduler.allocate(appAttempt.applicationAttemptId, @@ -1307,6 +1331,18 @@ public void transition(RMAppAttemptImpl appAttempt, // Register with AMLivelinessMonitor appAttempt.attemptLaunched(); + // Remove the blacklist added by AM blacklist from shared blacklist + // only for managed AM. + if(!appAttempt.submissionContext.getUnmanagedAM()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Removing system blacklist from application " + + appAttempt.getAppAttemptId().getApplicationId()); + } + appAttempt.scheduler.updateSystemBlacklist( + appAttempt.applicationAttemptId.getApplicationId(), + EMPTY_SYSTEM_BLACKLIST); + } + // register the ClientTokenMasterKey after it is saved in the store, // otherwise client may hold an invalid ClientToken after RM restarts. if (UserGroupInformation.isSecurityEnabled()) { @@ -1694,6 +1730,7 @@ private void sendFinishedContainersToNM() { private void sendAMContainerToNM(RMAppAttemptImpl appAttempt, RMAppAttemptContainerFinishedEvent containerFinishedEvent) { NodeId nodeId = containerFinishedEvent.getNodeId(); + appAttempt.addAMNodeToBlackList(containerFinishedEvent.getNodeId()); finishedContainersSentToAM.putIfAbsent(nodeId, new ArrayList()); appAttempt.finishedContainersSentToAM.get(nodeId).add( @@ -1708,6 +1745,15 @@ private void sendAMContainerToNM(RMAppAttemptImpl appAttempt, } } + private void addAMNodeToBlackList(NodeId nodeId) { + blacklistedNodesForAM.addNodeContainerFailure(nodeId.getHost().toString()); + } + + @Override + public BlacklistManager getAMBlacklist() { + return blacklistedNodesForAM; + } + private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt, RMAppAttemptContainerFinishedEvent containerFinishedEvent) { appAttempt.justFinishedContainers.putIfAbsent(containerFinishedEvent diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index ed05189..6468cbe 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -65,6 +65,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; @@ -98,7 +100,12 @@ private long configuredMaximumAllocationWaitTime; protected RMContext rmContext; - + + private int numClusterHosts = -1; + private long lastNumClusterHostsCheck = 0; + public final long numClusterHostsUpdateInterval = 60 * 1000; // 1 min + protected Clock clock; + /* * All schedulers which are inheriting AbstractYarnScheduler should use * concurrent version of 'applications' map. @@ -117,10 +124,29 @@ * @param name service name */ public AbstractYarnScheduler(String name) { + this(name, new SystemClock()); + } + + public AbstractYarnScheduler(String name, Clock clock) { super(name); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.maxAllocReadLock = lock.readLock(); this.maxAllocWriteLock = lock.writeLock(); + this.clock = clock; + } + + @Override + public int getNumClusterHosts() { + long now = clock.getTime(); + if (now >= lastNumClusterHostsCheck + numClusterHostsUpdateInterval) { + lastNumClusterHostsCheck = now; + Set hosts = new HashSet<>(); + for (NodeId node : this.nodes.keySet()) { + hosts.add(node.getHost()); + } + numClusterHosts = hosts.size(); + } + return numClusterHosts; } @Override @@ -236,6 +262,20 @@ public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) { } @Override + public void updateSystemBlacklist(ApplicationId appId, + List blacklistedNodes) { + SchedulerApplication app = + applications.get(appId); + if (app != null) { + final SchedulerApplicationAttempt currentAppAttempt = + app.getCurrentAppAttempt(); + if (currentAppAttempt != null) { + currentAppAttempt.updateSystemBlacklist(blacklistedNodes); + } + } + } + + @Override public SchedulerAppReport getSchedulerAppInfo( ApplicationAttemptId appAttemptId) { SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 77ac5b3..b1b29b6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -65,7 +65,8 @@ new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator()); final Map> requests = new ConcurrentHashMap>(); - private Set blacklist = new HashSet(); + private Set userBlacklist = new HashSet(); + private Set systemBlacklist = new HashSet(); //private final ApplicationStore store; private ActiveUsersManager activeUsersManager; @@ -217,24 +218,28 @@ synchronized public boolean updateResourceRequests( } /** - * The ApplicationMaster is updating the blacklist + * The ApplicationMaster is updating the userBlacklist * - * @param blacklistAdditions resources to be added to the blacklist - * @param blacklistRemovals resources to be removed from the blacklist + * @param blacklistAdditions resources to be added to the userBlacklist + * @param blacklistRemovals resources to be removed from the userBlacklist */ synchronized public void updateBlacklist( List blacklistAdditions, List blacklistRemovals) { - // Add to blacklist + // Add to userBlacklist if (blacklistAdditions != null) { - blacklist.addAll(blacklistAdditions); + userBlacklist.addAll(blacklistAdditions); } - // Remove from blacklist + // Remove from userBlacklist if (blacklistRemovals != null) { - blacklist.removeAll(blacklistRemovals); + userBlacklist.removeAll(blacklistRemovals); } } + synchronized public void updateSystemBlacklist(List blacklist) { + systemBlacklist = new HashSet<>(blacklist); + } + synchronized public Collection getPriorities() { return priorities; } @@ -263,8 +268,14 @@ public synchronized Resource getResource(Priority priority) { return (request == null) ? null : request.getCapability(); } + /** + * Returns if the node is either blacklisted by the user or the system + * @param resourceName the resourcename + * @return true if its blacklisted + */ public synchronized boolean isBlacklisted(String resourceName) { - return blacklist.contains(resourceName); + return userBlacklist.contains(resourceName) || + systemBlacklist.contains(resourceName); } /** @@ -474,18 +485,18 @@ public synchronized void setQueue(Queue queue) { } public synchronized Set getBlackList() { - return this.blacklist; + return this.userBlacklist; } public synchronized Set getBlackListCopy() { - return new HashSet<>(this.blacklist); + return new HashSet<>(this.userBlacklist); } public synchronized void transferStateFromPreviousAppSchedulingInfo( AppSchedulingInfo appInfo) { // this.priorities = appInfo.getPriorities(); // this.requests = appInfo.getRequests(); - this.blacklist = appInfo.getBlackList(); + this.userBlacklist = appInfo.getBlackList(); } public synchronized void recoverContainer(RMContainer rmContainer) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 4872543..27d4da1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -516,7 +516,14 @@ public synchronized void updateBlacklist( blacklistAdditions, blacklistRemovals); } } - + + public synchronized void updateSystemBlacklist( + List blacklistedNodes) { + if (!isStopped) { + this.appSchedulingInfo.updateSystemBlacklist(blacklistedNodes); + } + } + public boolean isBlacklisted(String resourceName) { return this.appSchedulingInfo.isBlacklisted(resourceName); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index 0fa23e1..849a022 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -120,7 +120,16 @@ public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues, @Public @Stable public int getNumClusterNodes(); - + + + /** + * Get the number of unique node hosts available in the cluster. + * @return the number of unique node hosts. + */ + @Public + @Stable + public int getNumClusterHosts(); + /** * The main api between the ApplicationMaster and the Scheduler. * The ApplicationMaster is updating his future resource requirements @@ -223,6 +232,14 @@ public String moveApplication(ApplicationId appId, String newQueue) throws YarnException; /** + * Updates the set of nodes blacklisted by System (RM) as opposed to the User + * @param appId The application Id for which to update the system blacklist + * @param blacklistedNodes the list of nodes blacklisted + */ + public void updateSystemBlacklist( + ApplicationId appId, List blacklistedNodes); + + /** * Completely drain sourceQueue of applications, by moving all of them to * destQueue. * diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 5080355..e464401 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -750,10 +750,7 @@ private static void waitForSchedulerAppAttemptAdded( public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm) throws Exception { - rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); - RMAppAttempt attempt = app.getCurrentAppAttempt(); - waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm); - rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED); + RMAppAttempt attempt = waitForAttemptScheduled(app, rm); System.out.println("Launch AM " + attempt.getAppAttemptId()); nm.nodeHeartbeat(true); MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); @@ -761,6 +758,15 @@ public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm) return am; } + public static RMAppAttempt waitForAttemptScheduled(RMApp app, MockRM rm) + throws Exception { + rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); + RMAppAttempt attempt = app.getCurrentAppAttempt(); + waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm); + rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED); + return attempt; + } + public static MockAM launchAndRegisterAM(RMApp app, MockRM rm, MockNM nm) throws Exception { MockAM am = launchAM(app, rm, nm); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index d579595..779b38c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -35,8 +35,12 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -49,11 +53,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.SystemClock; @@ -82,21 +88,7 @@ public void testAMRestartWithExistingContainers() throws Exception { MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); int NUM_CONTAINERS = 3; - // allocate NUM_CONTAINERS containers - am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS, - new ArrayList()); - nm1.nodeHeartbeat(true); - - // wait for containers to be allocated. - List containers = - am1.allocate(new ArrayList(), - new ArrayList()).getAllocatedContainers(); - while (containers.size() != NUM_CONTAINERS) { - nm1.nodeHeartbeat(true); - containers.addAll(am1.allocate(new ArrayList(), - new ArrayList()).getAllocatedContainers()); - Thread.sleep(200); - } + allocateContainers(nm1, am1, NUM_CONTAINERS); // launch the 2nd container, for testing running container transferred. nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING); @@ -244,6 +236,29 @@ public void testAMRestartWithExistingContainers() throws Exception { rm1.stop(); } + private List allocateContainers(MockNM nm1, MockAM am1, + int NUM_CONTAINERS) throws Exception { + // allocate NUM_CONTAINERS containers + am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS, + new ArrayList()); + nm1.nodeHeartbeat(true); + + // wait for containers to be allocated. + List containers = + am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + while (containers.size() != NUM_CONTAINERS) { + nm1.nodeHeartbeat(true); + containers.addAll(am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers()); + Thread.sleep(200); + } + + Assert.assertEquals("Did not get all containers allocated", + NUM_CONTAINERS, containers.size()); + return containers; + } + private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt) throws InterruptedException { int count = 0; @@ -258,6 +273,9 @@ private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt) public void testNMTokensRebindOnAMRestart() throws Exception { YarnConfiguration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3); + // To prevent test from blacklisting nm1 for AM, we sit threshold to half + // of 2 nodes which is 1 + conf.setFloat(YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD, 0.5f); MockRM rm1 = new MockRM(conf); rm1.start(); @@ -355,6 +373,104 @@ public void testNMTokensRebindOnAMRestart() throws Exception { rm1.stop(); } + @Test(timeout = 100000) + public void testAMBlacklistPreventsRestartOnSameNode() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, true); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + final DrainDispatcher dispatcher = new DrainDispatcher(); + MockRM rm1 = new MockRM(conf, memStore) { + @Override + protected EventHandler createSchedulerEventDispatcher() { + return new SchedulerEventDispatcher(this.scheduler) { + @Override + public void handle(SchedulerEvent event) { + scheduler.handle(event); + } + }; + } + + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + + rm1.start(); + + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + + MockNM nm2 = + new MockNM("127.0.0.2:2345", 8000, rm1.getResourceTrackerService()); + nm2.registerNode(); + + RMApp app1 = rm1.submitApp(200); + + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + CapacityScheduler scheduler = + (CapacityScheduler) rm1.getResourceScheduler(); + ContainerId amContainer = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); + // Preempt the first attempt; + RMContainer rmContainer = scheduler.getRMContainer(amContainer); + NodeId nodeWhereAMRan = rmContainer.getAllocatedNode(); + + scheduler.killContainer(rmContainer); + am1.waitForState(RMAppAttemptState.FAILED); + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + + // restart the am + RMAppAttempt attempt = rm1.waitForAttemptScheduled(app1, rm1); + System.out.println("Launch AM " + attempt.getAppAttemptId()); + + MockNM currentNode, otherNode; + + if (nodeWhereAMRan == nm1.getNodeId()) { + currentNode = nm1; + otherNode = nm2; + } else { + currentNode = nm2; + otherNode = nm1; + } + + currentNode.nodeHeartbeat(true); + dispatcher.await(); + Assert.assertEquals( + "AppAttemptState should still be SCHEDULED if currentNode is " + + "blacklisted correctly", + RMAppAttemptState.SCHEDULED, + attempt.getAppAttemptState()); + + otherNode.nodeHeartbeat(true); + dispatcher.await(); + + MockAM am2 = rm1.sendAMLaunched(attempt.getAppAttemptId()); + rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); + + amContainer = + ContainerId.newContainerId(am2.getApplicationAttemptId(), 1); + rmContainer = scheduler.getRMContainer(amContainer); + nodeWhereAMRan = rmContainer.getAllocatedNode(); + Assert.assertEquals( + "After blacklisting AM should have run on the other node", + otherNode.getNodeId(), nodeWhereAMRan); + + RegisterApplicationMasterResponse registerResponse = + am2.registerAppAttempt(); + rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + + List allocatedContainers = + allocateContainers(currentNode, am2, 1); + Assert.assertEquals( + "Even though AM is blacklisted from the node, application can still " + + "allocate containers there", + currentNode.getNodeId(), allocatedContainers.get(0).getNodeId()); + } + // AM container preempted, nm disk failure // should not be counted towards AM max retry count. @Test(timeout = 100000) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/TestBlacklistManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/TestBlacklistManager.java new file mode 100644 index 0000000..db2ebad --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/TestBlacklistManager.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.blacklist; + + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +public class TestBlacklistManager { + + @Test + public void testSimpleBlacklistBelowFailureThreshold() { + final int numberOfNodeManagerHosts = 3; + final double blacklistDisableFailureThreshold = 0.8; + BlacklistManager manager = new SimpleBlacklistManager( + numberOfNodeManagerHosts, blacklistDisableFailureThreshold); + String anyNode = "foo"; + String anyNode2 = "bar"; + manager.addNodeContainerFailure(anyNode); + manager.addNodeContainerFailure(anyNode2); + List blacklist = manager.getBlacklist(); + Collections.sort(blacklist); + + String[] expectedBlacklist = new String[]{anyNode2, anyNode}; + Assert.assertArrayEquals( + "Blacklist was not as expected", + expectedBlacklist, + blacklist.toArray()); + } + + @Test + public void testSimpleBlacklistAboveFailureThreshold() { + // Create a threshold of 0.5 * 3 i.e at 1.5 node failures. + BlacklistManager manager = new SimpleBlacklistManager(3, 0.5); + String anyNode = "foo"; + String anyNode2 = "bar"; + manager.addNodeContainerFailure(anyNode); + List blacklist = manager.getBlacklist(); + Collections.sort(blacklist); + + String[] expectedBlacklist = new String[]{anyNode}; + Assert.assertArrayEquals( + "Blacklist was not as expected", + expectedBlacklist, + blacklist.toArray()); + + manager.addNodeContainerFailure(anyNode2); + + blacklist = manager.getBlacklist(); + Assert.assertTrue( + "Blacklist should be empty but was " + + blacklist, + blacklist.isEmpty()); + } + + @Test + public void testDisabledBlacklist() { + BlacklistManager disabled = new DisabledBlacklistManager(); + String anyNode = "foo"; + disabled.addNodeContainerFailure(anyNode); + List blacklist = disabled.getBlacklist(); + + Assert.assertTrue( + "Blacklist should be empty but was " + + blacklist, + blacklist.isEmpty()); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java index fccfa19..484a1b6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java @@ -489,7 +489,7 @@ private RMApp createRMApp(Configuration conf) { 2, Resource.newInstance(10, 2), "test"); return new RMAppImpl(this.appId, this.rmContext, conf, "test", "test", "default", submissionContext, - this.rmContext.getScheduler(), + scheduler, this.rmContext.getApplicationMasterService(), System.currentTimeMillis(), "test", null, null); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 2e64d61..a5e3308 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -970,7 +970,7 @@ public void testRecoverApplication(ApplicationStateData appState, appState.getApplicationSubmissionContext().getApplicationId(), rmContext, conf, submissionContext.getApplicationName(), null, - submissionContext.getQueue(), submissionContext, null, null, + submissionContext.getQueue(), submissionContext, scheduler, null, appState.getSubmitTime(), submissionContext.getApplicationType(), submissionContext.getApplicationTags(), BuilderUtils.newResourceRequest( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index ffd1c1f..add619e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -27,6 +28,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.Container; @@ -49,8 +51,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; +import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Test; @@ -493,6 +497,30 @@ public void testResourceRequestRestoreWhenRMContainerIsAtAllocated() } } + @Test + public void testNumClusterHostsUpdate() { + @SuppressWarnings("unchecked") + AbstractYarnScheduler test = mock(AbstractYarnScheduler.class); + + final ControlledClock testClock = new ControlledClock(); + test.clock = testClock; + testClock.setTime(test.numClusterHostsUpdateInterval); + doCallRealMethod().when(test).getNumClusterHosts(); + + test.nodes = new ConcurrentHashMap<>(); + FiCaSchedulerNode mockNode = mock(FiCaSchedulerNode.class); + test.nodes.put(NodeId.newInstance("host1", 1), mockNode); + test.nodes.put(NodeId.newInstance("host1", 2), mockNode); + test.nodes.put(NodeId.newInstance("host2", 1), mockNode); + Assert.assertEquals(2, test.getNumClusterHosts()); + + // Still return old value before we move beyond check interval + test.nodes.put(NodeId.newInstance("host3", 1), mockNode); + Assert.assertEquals(2, test.getNumClusterHosts()); + testClock.setTime(testClock.getTime() + test.numClusterHostsUpdateInterval); + Assert.assertEquals(3, test.getNumClusterHosts()); + } + private void verifyMaximumResourceCapability( Resource expectedMaximumResource, AbstractYarnScheduler scheduler) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 403c8ea..1c9801d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -220,7 +220,7 @@ protected void createApplicationWithAMResource(ApplicationAttemptId attId, ApplicationId appId = attId.getApplicationId(); RMApp rmApp = new RMAppImpl(appId, rmContext, conf, null, user, null, ApplicationSubmissionContext.newInstance(appId, null, - queue, null, null, false, false, 0, amResource, null), null, null, + queue, null, null, false, false, 0, amResource, null), scheduler, null, 0, null, null, null); rmContext.getRMApps().put(appId, rmApp); RMAppEvent event = new RMAppEvent(appId, RMAppEventType.START);