diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
index 14e2645..71bb2d2 100644
--- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
+++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
@@ -882,6 +882,11 @@ public int getNumClusterNodes() {
}
@Override
+ public int getNumClusterHosts() {
+ return scheduler.getNumClusterHosts();
+ }
+
+ @Override
public SchedulerNodeReport getNodeReport(NodeId nodeId) {
return scheduler.getNodeReport(nodeId);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 6c438f2..2d386bf 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1967,6 +1967,15 @@ public static boolean isDistributedNodeLabelConfiguration(Configuration conf) {
NODELABEL_CONFIGURATION_TYPE, DEFAULT_NODELABEL_CONFIGURATION_TYPE));
}
+ public static final String AM_BLACKLISTING_ENABLED =
+ YARN_PREFIX + "am.blacklisting.enabled";
+ public static final boolean DEFAULT_AM_BLACKLISTING_ENABLED = true;
+
+ public static final String AM_BLACKLISTING_DISABLE_THRESHOLD =
+ YARN_PREFIX + "am.blacklisting.disable-failure-threshold";
+ public static final float DEFAULT_AM_BLACKLISTING_DISABLE_THRESHOLD = 0.8f;
+
+
public YarnConfiguration() {
super();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 53face0..d5c4988 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2179,4 +2179,22 @@
0
+
+
+ Enable/disable blacklisting of hosts for AM based on AM failures on those
+ hosts.
+
+ yarn.am.blacklisting.enabled
+ true
+
+
+
+
+ Threshold of ratio number of NodeManager hosts that are allowed to be
+ blacklisted for AM. Beyond this ratio there is no blacklisting to avoid
+ danger of blacklisting the entire cluster.
+
+ yarn.am.blacklisting.disable-failure-threshold
+ 0.8f
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistManager.java
new file mode 100644
index 0000000..7062669
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistManager.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
+
+import java.util.List;
+
+/**
+ * Tracks blacklists based on failures reported on nodes.
+ */
+public interface BlacklistManager {
+
+ /**
+ * Report failure of a container on node.
+ * @param node that has a container failure
+ */
+ void addNodeContainerFailure(String node);
+
+ /**
+ * Get blacklist.
+ * @return blacklist.
+ */
+ List getBlacklist();
+
+ /**
+ * Refresh the number of nodemanager hosts available for scheduling.
+ * @param nodeHostCount is the number of node hosts.
+ */
+ void refreshNodeHostCount(int nodeHostCount);
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/DisabledBlacklistManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/DisabledBlacklistManager.java
new file mode 100644
index 0000000..d79d080
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/DisabledBlacklistManager.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A {@link BlacklistManager} that returns no blacklists.
+ */
+public class DisabledBlacklistManager implements BlacklistManager{
+
+ private static final List noBlacklist = new ArrayList();
+
+ @Override
+ public void addNodeContainerFailure(String node) {
+ }
+
+ @Override
+ public List getBlacklist() {
+ return noBlacklist;
+ }
+
+ @Override
+ public void refreshNodeHostCount(int nodeHostCount) {
+ // Do nothing
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/SimpleBlacklistManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/SimpleBlacklistManager.java
new file mode 100644
index 0000000..0a3dd7e
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/SimpleBlacklistManager.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Maintains a list of failed nodes and returns that as long as number of
+ * blacklisted nodes is below a threshold percentage of total nodes. If more
+ * than threshold number of nodes are marked as failure they all are returned
+ * as removal from blacklist so previous additions are reversed.
+ */
+public class SimpleBlacklistManager implements BlacklistManager {
+
+ private int numberOfNodeManagerHosts;
+ private final double blacklistDisableFailureThreshold;
+ private final Set blacklistNodes = new HashSet<>();
+ private static final ArrayList EMPTY_LIST = new ArrayList<>();
+
+ private static final Log LOG = LogFactory.getLog(SimpleBlacklistManager.class);
+
+ public SimpleBlacklistManager(int numberOfNodeManagerHosts,
+ double blacklistDisableFailureThreshold) {
+ this.numberOfNodeManagerHosts = numberOfNodeManagerHosts;
+ this.blacklistDisableFailureThreshold = blacklistDisableFailureThreshold;
+ }
+
+ @Override
+ public void addNodeContainerFailure(String node) {
+ blacklistNodes.add(node);
+ }
+
+ @Override
+ public void refreshNodeHostCount(int nodeHostCount) {
+ this.numberOfNodeManagerHosts = nodeHostCount;
+ }
+
+ @Override
+ public List getBlacklist() {
+ List ret;
+ List blacklist = new ArrayList<>(blacklistNodes);
+ final int currentBlacklistSize = blacklist.size();
+ final double failureThreshold = this.blacklistDisableFailureThreshold *
+ numberOfNodeManagerHosts;
+ if (currentBlacklistSize < failureThreshold) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("blacklist size " + currentBlacklistSize + " is less than " +
+ "failure threshold ratio " + blacklistDisableFailureThreshold +
+ " out of total usable nodes " + numberOfNodeManagerHosts);
+ }
+ ret = blacklist;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("blacklist size " + currentBlacklistSize + " is more than " +
+ "failure threshold ratio " + blacklistDisableFailureThreshold +
+ " out of total usable nodes " + numberOfNodeManagerHosts);
+ }
+ ret = EMPTY_LIST;
+ }
+ return ret;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 42ff1de..ededab5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -74,6 +74,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
+import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
+import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
@@ -133,6 +136,8 @@
private final Set applicationTags;
private final long attemptFailuresValidityInterval;
+ private final boolean amBlacklistingEnabled;
+ private final float blacklistDisableThreshold;
private Clock systemClock;
@@ -447,7 +452,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
}
this.logAggregationEnabled =
conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
- YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
if (this.logAggregationEnabled) {
this.logAggregationStatusForAppReport = LogAggregationStatus.NOT_START;
} else {
@@ -456,6 +461,14 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
maxLogAggregationDiagnosticsInMemory = conf.getInt(
YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
+
+ amBlacklistingEnabled = conf.getBoolean(
+ YarnConfiguration.AM_BLACKLISTING_ENABLED,
+ YarnConfiguration.DEFAULT_AM_BLACKLISTING_ENABLED);
+
+ blacklistDisableThreshold = conf.getFloat(
+ YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD,
+ YarnConfiguration.DEFAULT_AM_BLACKLISTING_DISABLE_THRESHOLD);
}
@Override
@@ -796,6 +809,18 @@ public void recover(RMState state) {
private void createNewAttempt() {
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
+
+ BlacklistManager currentAMBlacklist = null;
+ if (currentAttempt != null) {
+ currentAMBlacklist = currentAttempt.getAMBlacklist();
+ } else {
+ if (amBlacklistingEnabled) {
+ currentAMBlacklist = new SimpleBlacklistManager(
+ scheduler.getNumClusterHosts(), blacklistDisableThreshold);
+ } else {
+ currentAMBlacklist = new DisabledBlacklistManager();
+ }
+ }
RMAppAttempt attempt =
new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
submissionContext, conf,
@@ -803,7 +828,8 @@ private void createNewAttempt() {
// previously failed attempts(which should not include Preempted,
// hardware error and NM resync) + 1) equal to the max-attempt
// limit.
- maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq);
+ maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq,
+ currentAMBlacklist);
attempts.put(appAttemptId, attempt);
currentAttempt = attempt;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
index b85174e..4dd8345 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
@@ -38,6 +38,7 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
/**
@@ -185,6 +186,12 @@
ApplicationResourceUsageReport getApplicationResourceUsageReport();
/**
+ * Get the {@link BlacklistManager} that manages blacklists for AM failures
+ * @return the {@link BlacklistManager} that tracks AM failures.
+ */
+ BlacklistManager getAMBlacklist();
+
+ /**
* the start time of the application.
* @return the start time of the application.
*/
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 74a4000..67039b8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -36,7 +36,6 @@
import javax.crypto.SecretKey;
-import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -71,6 +70,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
+import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@@ -182,6 +183,7 @@
private RMAppAttemptMetrics attemptMetrics = null;
private ResourceRequest amReq = null;
+ private BlacklistManager blacklistedNodesForAM = null;
private static final StateMachineFactory EMPTY_CONTAINER_REQUEST_LIST =
new ArrayList();
+ private static final List EMPTY_SYSTEM_BLACKLIST = new ArrayList<>();
+
@VisibleForTesting
public static final class ScheduleTransition
implements
@@ -939,7 +954,16 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY);
appAttempt.amReq.setResourceName(ResourceRequest.ANY);
appAttempt.amReq.setRelaxLocality(true);
-
+
+ appAttempt.getAMBlacklist().refreshNodeHostCount(
+ appAttempt.scheduler.getNumClusterHosts());
+
+ List blacklist = appAttempt.getAMBlacklist().getBlacklist();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting system Blacklist for launching AM " + blacklist);
+ }
+ appAttempt.scheduler.updateSystemBlacklist(
+ appAttempt.getAppAttemptId().getApplicationId(), blacklist);
// AM resource has been checked when submission
Allocation amContainerAllocation =
appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
@@ -1307,6 +1331,18 @@ public void transition(RMAppAttemptImpl appAttempt,
// Register with AMLivelinessMonitor
appAttempt.attemptLaunched();
+ // Remove the blacklist added by AM blacklist from shared blacklist
+ // only for managed AM.
+ if(!appAttempt.submissionContext.getUnmanagedAM()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing system blacklist from application " +
+ appAttempt.getAppAttemptId().getApplicationId());
+ }
+ appAttempt.scheduler.updateSystemBlacklist(
+ appAttempt.applicationAttemptId.getApplicationId(),
+ EMPTY_SYSTEM_BLACKLIST);
+ }
+
// register the ClientTokenMasterKey after it is saved in the store,
// otherwise client may hold an invalid ClientToken after RM restarts.
if (UserGroupInformation.isSecurityEnabled()) {
@@ -1694,6 +1730,7 @@ private void sendFinishedContainersToNM() {
private void sendAMContainerToNM(RMAppAttemptImpl appAttempt,
RMAppAttemptContainerFinishedEvent containerFinishedEvent) {
NodeId nodeId = containerFinishedEvent.getNodeId();
+ appAttempt.addAMNodeToBlackList(containerFinishedEvent.getNodeId());
finishedContainersSentToAM.putIfAbsent(nodeId,
new ArrayList());
appAttempt.finishedContainersSentToAM.get(nodeId).add(
@@ -1708,6 +1745,15 @@ private void sendAMContainerToNM(RMAppAttemptImpl appAttempt,
}
}
+ private void addAMNodeToBlackList(NodeId nodeId) {
+ blacklistedNodesForAM.addNodeContainerFailure(nodeId.getHost().toString());
+ }
+
+ @Override
+ public BlacklistManager getAMBlacklist() {
+ return blacklistedNodesForAM;
+ }
+
private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt,
RMAppAttemptContainerFinishedEvent containerFinishedEvent) {
appAttempt.justFinishedContainers.putIfAbsent(containerFinishedEvent
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index ed05189..6468cbe 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -65,6 +65,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
@@ -98,7 +100,12 @@
private long configuredMaximumAllocationWaitTime;
protected RMContext rmContext;
-
+
+ private int numClusterHosts = -1;
+ private long lastNumClusterHostsCheck = 0;
+ public final long numClusterHostsUpdateInterval = 60 * 1000; // 1 min
+ protected Clock clock;
+
/*
* All schedulers which are inheriting AbstractYarnScheduler should use
* concurrent version of 'applications' map.
@@ -117,10 +124,29 @@
* @param name service name
*/
public AbstractYarnScheduler(String name) {
+ this(name, new SystemClock());
+ }
+
+ public AbstractYarnScheduler(String name, Clock clock) {
super(name);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.maxAllocReadLock = lock.readLock();
this.maxAllocWriteLock = lock.writeLock();
+ this.clock = clock;
+ }
+
+ @Override
+ public int getNumClusterHosts() {
+ long now = clock.getTime();
+ if (now >= lastNumClusterHostsCheck + numClusterHostsUpdateInterval) {
+ lastNumClusterHostsCheck = now;
+ Set hosts = new HashSet<>();
+ for (NodeId node : this.nodes.keySet()) {
+ hosts.add(node.getHost());
+ }
+ numClusterHosts = hosts.size();
+ }
+ return numClusterHosts;
}
@Override
@@ -236,6 +262,20 @@ public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
}
@Override
+ public void updateSystemBlacklist(ApplicationId appId,
+ List blacklistedNodes) {
+ SchedulerApplication app =
+ applications.get(appId);
+ if (app != null) {
+ final SchedulerApplicationAttempt currentAppAttempt =
+ app.getCurrentAppAttempt();
+ if (currentAppAttempt != null) {
+ currentAppAttempt.updateSystemBlacklist(blacklistedNodes);
+ }
+ }
+ }
+
+ @Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId appAttemptId) {
SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 77ac5b3..b1b29b6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -65,7 +65,8 @@
new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
final Map> requests =
new ConcurrentHashMap>();
- private Set blacklist = new HashSet();
+ private Set userBlacklist = new HashSet();
+ private Set systemBlacklist = new HashSet();
//private final ApplicationStore store;
private ActiveUsersManager activeUsersManager;
@@ -217,24 +218,28 @@ synchronized public boolean updateResourceRequests(
}
/**
- * The ApplicationMaster is updating the blacklist
+ * The ApplicationMaster is updating the userBlacklist
*
- * @param blacklistAdditions resources to be added to the blacklist
- * @param blacklistRemovals resources to be removed from the blacklist
+ * @param blacklistAdditions resources to be added to the userBlacklist
+ * @param blacklistRemovals resources to be removed from the userBlacklist
*/
synchronized public void updateBlacklist(
List blacklistAdditions, List blacklistRemovals) {
- // Add to blacklist
+ // Add to userBlacklist
if (blacklistAdditions != null) {
- blacklist.addAll(blacklistAdditions);
+ userBlacklist.addAll(blacklistAdditions);
}
- // Remove from blacklist
+ // Remove from userBlacklist
if (blacklistRemovals != null) {
- blacklist.removeAll(blacklistRemovals);
+ userBlacklist.removeAll(blacklistRemovals);
}
}
+ synchronized public void updateSystemBlacklist(List blacklist) {
+ systemBlacklist = new HashSet<>(blacklist);
+ }
+
synchronized public Collection getPriorities() {
return priorities;
}
@@ -263,8 +268,14 @@ public synchronized Resource getResource(Priority priority) {
return (request == null) ? null : request.getCapability();
}
+ /**
+ * Returns if the node is either blacklisted by the user or the system
+ * @param resourceName the resourcename
+ * @return true if its blacklisted
+ */
public synchronized boolean isBlacklisted(String resourceName) {
- return blacklist.contains(resourceName);
+ return userBlacklist.contains(resourceName) ||
+ systemBlacklist.contains(resourceName);
}
/**
@@ -474,18 +485,18 @@ public synchronized void setQueue(Queue queue) {
}
public synchronized Set getBlackList() {
- return this.blacklist;
+ return this.userBlacklist;
}
public synchronized Set getBlackListCopy() {
- return new HashSet<>(this.blacklist);
+ return new HashSet<>(this.userBlacklist);
}
public synchronized void transferStateFromPreviousAppSchedulingInfo(
AppSchedulingInfo appInfo) {
// this.priorities = appInfo.getPriorities();
// this.requests = appInfo.getRequests();
- this.blacklist = appInfo.getBlackList();
+ this.userBlacklist = appInfo.getBlackList();
}
public synchronized void recoverContainer(RMContainer rmContainer) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 4872543..27d4da1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -516,7 +516,14 @@ public synchronized void updateBlacklist(
blacklistAdditions, blacklistRemovals);
}
}
-
+
+ public synchronized void updateSystemBlacklist(
+ List blacklistedNodes) {
+ if (!isStopped) {
+ this.appSchedulingInfo.updateSystemBlacklist(blacklistedNodes);
+ }
+ }
+
public boolean isBlacklisted(String resourceName) {
return this.appSchedulingInfo.isBlacklisted(resourceName);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
index 0fa23e1..849a022 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
@@ -120,7 +120,16 @@ public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues,
@Public
@Stable
public int getNumClusterNodes();
-
+
+
+ /**
+ * Get the number of unique node hosts available in the cluster.
+ * @return the number of unique node hosts.
+ */
+ @Public
+ @Stable
+ public int getNumClusterHosts();
+
/**
* The main api between the ApplicationMaster and the Scheduler.
* The ApplicationMaster is updating his future resource requirements
@@ -223,6 +232,14 @@ public String moveApplication(ApplicationId appId, String newQueue)
throws YarnException;
/**
+ * Updates the set of nodes blacklisted by System (RM) as opposed to the User
+ * @param appId The application Id for which to update the system blacklist
+ * @param blacklistedNodes the list of nodes blacklisted
+ */
+ public void updateSystemBlacklist(
+ ApplicationId appId, List blacklistedNodes);
+
+ /**
* Completely drain sourceQueue of applications, by moving all of them to
* destQueue.
*
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 5080355..e464401 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -750,10 +750,7 @@ private static void waitForSchedulerAppAttemptAdded(
public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
throws Exception {
- rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
- RMAppAttempt attempt = app.getCurrentAppAttempt();
- waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm);
- rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED);
+ RMAppAttempt attempt = waitForAttemptScheduled(app, rm);
System.out.println("Launch AM " + attempt.getAppAttemptId());
nm.nodeHeartbeat(true);
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
@@ -761,6 +758,15 @@ public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
return am;
}
+ public static RMAppAttempt waitForAttemptScheduled(RMApp app, MockRM rm)
+ throws Exception {
+ rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
+ RMAppAttempt attempt = app.getCurrentAppAttempt();
+ waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm);
+ rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED);
+ return attempt;
+ }
+
public static MockAM launchAndRegisterAM(RMApp app, MockRM rm, MockNM nm)
throws Exception {
MockAM am = launchAM(app, rm, nm);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
index d579595..779b38c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
@@ -35,8 +35,12 @@
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@@ -49,11 +53,13 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.SystemClock;
@@ -82,21 +88,7 @@ public void testAMRestartWithExistingContainers() throws Exception {
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
int NUM_CONTAINERS = 3;
- // allocate NUM_CONTAINERS containers
- am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
- new ArrayList());
- nm1.nodeHeartbeat(true);
-
- // wait for containers to be allocated.
- List containers =
- am1.allocate(new ArrayList(),
- new ArrayList()).getAllocatedContainers();
- while (containers.size() != NUM_CONTAINERS) {
- nm1.nodeHeartbeat(true);
- containers.addAll(am1.allocate(new ArrayList(),
- new ArrayList()).getAllocatedContainers());
- Thread.sleep(200);
- }
+ allocateContainers(nm1, am1, NUM_CONTAINERS);
// launch the 2nd container, for testing running container transferred.
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
@@ -244,6 +236,29 @@ public void testAMRestartWithExistingContainers() throws Exception {
rm1.stop();
}
+ private List allocateContainers(MockNM nm1, MockAM am1,
+ int NUM_CONTAINERS) throws Exception {
+ // allocate NUM_CONTAINERS containers
+ am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
+ new ArrayList());
+ nm1.nodeHeartbeat(true);
+
+ // wait for containers to be allocated.
+ List containers =
+ am1.allocate(new ArrayList(),
+ new ArrayList()).getAllocatedContainers();
+ while (containers.size() != NUM_CONTAINERS) {
+ nm1.nodeHeartbeat(true);
+ containers.addAll(am1.allocate(new ArrayList(),
+ new ArrayList()).getAllocatedContainers());
+ Thread.sleep(200);
+ }
+
+ Assert.assertEquals("Did not get all containers allocated",
+ NUM_CONTAINERS, containers.size());
+ return containers;
+ }
+
private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt)
throws InterruptedException {
int count = 0;
@@ -258,6 +273,9 @@ private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt)
public void testNMTokensRebindOnAMRestart() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3);
+ // To prevent test from blacklisting nm1 for AM, we sit threshold to half
+ // of 2 nodes which is 1
+ conf.setFloat(YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD, 0.5f);
MockRM rm1 = new MockRM(conf);
rm1.start();
@@ -355,6 +373,104 @@ public void testNMTokensRebindOnAMRestart() throws Exception {
rm1.stop();
}
+ @Test(timeout = 100000)
+ public void testAMBlacklistPreventsRestartOnSameNode() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, true);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ final DrainDispatcher dispatcher = new DrainDispatcher();
+ MockRM rm1 = new MockRM(conf, memStore) {
+ @Override
+ protected EventHandler createSchedulerEventDispatcher() {
+ return new SchedulerEventDispatcher(this.scheduler) {
+ @Override
+ public void handle(SchedulerEvent event) {
+ scheduler.handle(event);
+ }
+ };
+ }
+
+ @Override
+ protected Dispatcher createDispatcher() {
+ return dispatcher;
+ }
+ };
+
+ rm1.start();
+
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ MockNM nm2 =
+ new MockNM("127.0.0.2:2345", 8000, rm1.getResourceTrackerService());
+ nm2.registerNode();
+
+ RMApp app1 = rm1.submitApp(200);
+
+ RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ CapacityScheduler scheduler =
+ (CapacityScheduler) rm1.getResourceScheduler();
+ ContainerId amContainer =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+ // Preempt the first attempt;
+ RMContainer rmContainer = scheduler.getRMContainer(amContainer);
+ NodeId nodeWhereAMRan = rmContainer.getAllocatedNode();
+
+ scheduler.killContainer(rmContainer);
+ am1.waitForState(RMAppAttemptState.FAILED);
+ rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+
+ // restart the am
+ RMAppAttempt attempt = rm1.waitForAttemptScheduled(app1, rm1);
+ System.out.println("Launch AM " + attempt.getAppAttemptId());
+
+ MockNM currentNode, otherNode;
+
+ if (nodeWhereAMRan == nm1.getNodeId()) {
+ currentNode = nm1;
+ otherNode = nm2;
+ } else {
+ currentNode = nm2;
+ otherNode = nm1;
+ }
+
+ currentNode.nodeHeartbeat(true);
+ dispatcher.await();
+ Assert.assertEquals(
+ "AppAttemptState should still be SCHEDULED if currentNode is " +
+ "blacklisted correctly",
+ RMAppAttemptState.SCHEDULED,
+ attempt.getAppAttemptState());
+
+ otherNode.nodeHeartbeat(true);
+ dispatcher.await();
+
+ MockAM am2 = rm1.sendAMLaunched(attempt.getAppAttemptId());
+ rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
+
+ amContainer =
+ ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
+ rmContainer = scheduler.getRMContainer(amContainer);
+ nodeWhereAMRan = rmContainer.getAllocatedNode();
+ Assert.assertEquals(
+ "After blacklisting AM should have run on the other node",
+ otherNode.getNodeId(), nodeWhereAMRan);
+
+ RegisterApplicationMasterResponse registerResponse =
+ am2.registerAppAttempt();
+ rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+
+ List allocatedContainers =
+ allocateContainers(currentNode, am2, 1);
+ Assert.assertEquals(
+ "Even though AM is blacklisted from the node, application can still " +
+ "allocate containers there",
+ currentNode.getNodeId(), allocatedContainers.get(0).getNodeId());
+ }
+
// AM container preempted, nm disk failure
// should not be counted towards AM max retry count.
@Test(timeout = 100000)
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/TestBlacklistManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/TestBlacklistManager.java
new file mode 100644
index 0000000..db2ebad
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/TestBlacklistManager.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
+
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+public class TestBlacklistManager {
+
+ @Test
+ public void testSimpleBlacklistBelowFailureThreshold() {
+ final int numberOfNodeManagerHosts = 3;
+ final double blacklistDisableFailureThreshold = 0.8;
+ BlacklistManager manager = new SimpleBlacklistManager(
+ numberOfNodeManagerHosts, blacklistDisableFailureThreshold);
+ String anyNode = "foo";
+ String anyNode2 = "bar";
+ manager.addNodeContainerFailure(anyNode);
+ manager.addNodeContainerFailure(anyNode2);
+ List blacklist = manager.getBlacklist();
+ Collections.sort(blacklist);
+
+ String[] expectedBlacklist = new String[]{anyNode2, anyNode};
+ Assert.assertArrayEquals(
+ "Blacklist was not as expected",
+ expectedBlacklist,
+ blacklist.toArray());
+ }
+
+ @Test
+ public void testSimpleBlacklistAboveFailureThreshold() {
+ // Create a threshold of 0.5 * 3 i.e at 1.5 node failures.
+ BlacklistManager manager = new SimpleBlacklistManager(3, 0.5);
+ String anyNode = "foo";
+ String anyNode2 = "bar";
+ manager.addNodeContainerFailure(anyNode);
+ List blacklist = manager.getBlacklist();
+ Collections.sort(blacklist);
+
+ String[] expectedBlacklist = new String[]{anyNode};
+ Assert.assertArrayEquals(
+ "Blacklist was not as expected",
+ expectedBlacklist,
+ blacklist.toArray());
+
+ manager.addNodeContainerFailure(anyNode2);
+
+ blacklist = manager.getBlacklist();
+ Assert.assertTrue(
+ "Blacklist should be empty but was " +
+ blacklist,
+ blacklist.isEmpty());
+ }
+
+ @Test
+ public void testDisabledBlacklist() {
+ BlacklistManager disabled = new DisabledBlacklistManager();
+ String anyNode = "foo";
+ disabled.addNodeContainerFailure(anyNode);
+ List blacklist = disabled.getBlacklist();
+
+ Assert.assertTrue(
+ "Blacklist should be empty but was " +
+ blacklist,
+ blacklist.isEmpty());
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
index fccfa19..484a1b6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
@@ -489,7 +489,7 @@ private RMApp createRMApp(Configuration conf) {
2, Resource.newInstance(10, 2), "test");
return new RMAppImpl(this.appId, this.rmContext,
conf, "test", "test", "default", submissionContext,
- this.rmContext.getScheduler(),
+ scheduler,
this.rmContext.getApplicationMasterService(),
System.currentTimeMillis(), "test",
null, null);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
index 2e64d61..a5e3308 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
@@ -970,7 +970,7 @@ public void testRecoverApplication(ApplicationStateData appState,
appState.getApplicationSubmissionContext().getApplicationId(),
rmContext, conf,
submissionContext.getApplicationName(), null,
- submissionContext.getQueue(), submissionContext, null, null,
+ submissionContext.getQueue(), submissionContext, scheduler, null,
appState.getSubmitTime(), submissionContext.getApplicationType(),
submissionContext.getApplicationTags(),
BuilderUtils.newResourceRequest(
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
index ffd1c1f..add619e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -27,6 +28,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container;
@@ -49,8 +51,10 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Test;
@@ -493,6 +497,30 @@ public void testResourceRequestRestoreWhenRMContainerIsAtAllocated()
}
}
+ @Test
+ public void testNumClusterHostsUpdate() {
+ @SuppressWarnings("unchecked")
+ AbstractYarnScheduler test = mock(AbstractYarnScheduler.class);
+
+ final ControlledClock testClock = new ControlledClock();
+ test.clock = testClock;
+ testClock.setTime(test.numClusterHostsUpdateInterval);
+ doCallRealMethod().when(test).getNumClusterHosts();
+
+ test.nodes = new ConcurrentHashMap<>();
+ FiCaSchedulerNode mockNode = mock(FiCaSchedulerNode.class);
+ test.nodes.put(NodeId.newInstance("host1", 1), mockNode);
+ test.nodes.put(NodeId.newInstance("host1", 2), mockNode);
+ test.nodes.put(NodeId.newInstance("host2", 1), mockNode);
+ Assert.assertEquals(2, test.getNumClusterHosts());
+
+ // Still return old value before we move beyond check interval
+ test.nodes.put(NodeId.newInstance("host3", 1), mockNode);
+ Assert.assertEquals(2, test.getNumClusterHosts());
+ testClock.setTime(testClock.getTime() + test.numClusterHostsUpdateInterval);
+ Assert.assertEquals(3, test.getNumClusterHosts());
+ }
+
private void verifyMaximumResourceCapability(
Resource expectedMaximumResource, AbstractYarnScheduler scheduler) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
index 403c8ea..1c9801d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
@@ -220,7 +220,7 @@ protected void createApplicationWithAMResource(ApplicationAttemptId attId,
ApplicationId appId = attId.getApplicationId();
RMApp rmApp = new RMAppImpl(appId, rmContext, conf,
null, user, null, ApplicationSubmissionContext.newInstance(appId, null,
- queue, null, null, false, false, 0, amResource, null), null, null,
+ queue, null, null, false, false, 0, amResource, null), scheduler, null,
0, null, null, null);
rmContext.getRMApps().put(appId, rmApp);
RMAppEvent event = new RMAppEvent(appId, RMAppEventType.START);