diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index a18ef7c..cdd287d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2003,6 +2003,15 @@ public static boolean isDistributedNodeLabelConfiguration(Configuration conf) { public static final String NM_PROVIDER_CONFIGURED_NODE_LABELS = NM_NODE_LABELS_PROVIDER_PREFIX + "configured-node-labels"; + public static final String AM_BLACKLISTING_ENABLED = + YARN_PREFIX + "am.blacklisting.enabled"; + public static final boolean DEFAULT_AM_BLACKLISTING_ENABLED = true; + + public static final String AM_BLACKLISTING_DISABLE_THRESHOLD = + YARN_PREFIX + "am.blacklisting.disable-failure-threshold"; + public static final float DEFAULT_AM_BLACKLISTING_DISABLE_THRESHOLD = 0.8f; + + public YarnConfiguration() { super(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 62ba599..11a1bb4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2250,4 +2250,22 @@ + + + Enable/disable blacklisting of hosts for AM based on AM failures on those + hosts. + + yarn.am.blacklisting.enabled + true + + + + + Threshold of ratio number of NodeManager hosts that are allowed to be + blacklisted for AM. Beyond this ratio there is no blacklisting to avoid + danger of blacklisting the entire cluster. + + yarn.am.blacklisting.disable-failure-threshold + 0.8f + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistAdditionsRemovals.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistAdditionsRemovals.java new file mode 100644 index 0000000..5b49bba --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistAdditionsRemovals.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.blacklist; + +import java.util.List; + +/** + * Class to track blacklist additions and removals. + */ +public class BlacklistAdditionsRemovals { + + private List blacklistAdditions; + private List blacklistRemovals; + + public BlacklistAdditionsRemovals(List blacklistAdditions, + List blacklistRemovals) { + this.blacklistAdditions = blacklistAdditions; + this.blacklistRemovals = blacklistRemovals; + } + + public List getBlacklistAdditions() { + return blacklistAdditions; + } + + public List getBlacklistRemovals() { + return blacklistRemovals; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistManager.java new file mode 100644 index 0000000..4b7f580 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistManager.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.blacklist; + +/** + * Tracks blacklists based on failures reported on nodes. + */ +public interface BlacklistManager { + + /** + * Report failure of a container on node. + * @param node that has a container failure + */ + void addNodeContainerFailure(String node); + + /** + * Get {@link BlacklistAdditionsRemovals} that indicate which nodes should be + * added or to removed from the blacklist. + * @return {@link BlacklistAdditionsRemovals} + */ + BlacklistAdditionsRemovals getBlacklistAdditionsRemovals(); + + /** + * Refresh the number of nodemanager hosts available for scheduling. + * @param nodeHostCount is the number of node hosts. + */ + void refreshNodeHostCount(int nodeHostCount); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/DisabledBlacklistManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/DisabledBlacklistManager.java new file mode 100644 index 0000000..bc2036c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/DisabledBlacklistManager.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.blacklist; + +import java.util.ArrayList; + +/** + * A {@link BlacklistManager} that returns no blacklists. + */ +public class DisabledBlacklistManager implements BlacklistManager{ + + private BlacklistAdditionsRemovals noBlacklist = + new BlacklistAdditionsRemovals(new ArrayList(), + new ArrayList()); + + @Override + public void addNodeContainerFailure(String node) { + } + + @Override + public BlacklistAdditionsRemovals getBlacklistAdditionsRemovals() { + return noBlacklist; + } + + @Override + public void refreshNodeHostCount(int nodeHostCount) { + // Do nothing + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/SimpleBlacklistManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/SimpleBlacklistManager.java new file mode 100644 index 0000000..1e539c4 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/SimpleBlacklistManager.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.blacklist; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Maintains a list of failed nodes and returns that as long as number of + * blacklisted nodes is below a threshold percentage of total nodes. If more + * than threshold number of nodes are marked as failure they all are returned + * as removal from blacklist so previous additions are reversed. + */ +public class SimpleBlacklistManager implements BlacklistManager { + + private int numberOfNodeManagerHosts; + private final double blacklistDisableFailureThreshold; + private final Set blacklistNodes = new HashSet<>(); + private static final ArrayList EMPTY_LIST = new ArrayList<>(); + + private static final Log LOG = LogFactory.getLog(SimpleBlacklistManager.class); + + public SimpleBlacklistManager(int numberOfNodeManagerHosts, + double blacklistDisableFailureThreshold) { + this.numberOfNodeManagerHosts = numberOfNodeManagerHosts; + this.blacklistDisableFailureThreshold = blacklistDisableFailureThreshold; + } + + @Override + public void addNodeContainerFailure(String node) { + blacklistNodes.add(node); + } + + @Override + public void refreshNodeHostCount(int nodeHostCount) { + this.numberOfNodeManagerHosts = nodeHostCount; + } + + @Override + public BlacklistAdditionsRemovals getBlacklistAdditionsRemovals() { + BlacklistAdditionsRemovals ret; + List blacklist = new ArrayList<>(blacklistNodes); + final int currentBlacklistSize = blacklist.size(); + final double failureThreshold = this.blacklistDisableFailureThreshold * + numberOfNodeManagerHosts; + if (currentBlacklistSize < failureThreshold) { + if (LOG.isDebugEnabled()) { + LOG.debug("blacklist size " + currentBlacklistSize + " is less than " + + "failure threshold ratio " + blacklistDisableFailureThreshold + + " out of total usable nodes " + numberOfNodeManagerHosts); + } + ret = new BlacklistAdditionsRemovals(blacklist, EMPTY_LIST); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("blacklist size " + currentBlacklistSize + " is more than " + + "failure threshold ratio " + blacklistDisableFailureThreshold + + " out of total usable nodes " + numberOfNodeManagerHosts); + } + ret = new BlacklistAdditionsRemovals(EMPTY_LIST, blacklist); + } + return ret; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 2eb74f7..a723ba8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -74,6 +74,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager; +import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager; +import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; @@ -133,6 +136,8 @@ private final Set applicationTags; private final long attemptFailuresValidityInterval; + private final boolean amBlacklistingEnabled; + private final float blacklistDisableThreshold; private Clock systemClock; @@ -447,7 +452,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, } this.logAggregationEnabled = conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, - YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); if (this.logAggregationEnabled) { this.logAggregationStatusForAppReport = LogAggregationStatus.NOT_START; } else { @@ -456,6 +461,14 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, maxLogAggregationDiagnosticsInMemory = conf.getInt( YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY, YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY); + + amBlacklistingEnabled = conf.getBoolean( + YarnConfiguration.AM_BLACKLISTING_ENABLED, + YarnConfiguration.DEFAULT_AM_BLACKLISTING_ENABLED); + + blacklistDisableThreshold = conf.getFloat( + YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD, + YarnConfiguration.DEFAULT_AM_BLACKLISTING_DISABLE_THRESHOLD); } @Override @@ -797,6 +810,18 @@ public void recover(RMState state) { private void createNewAttempt() { ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1); + + BlacklistManager currentAMBlacklist = null; + if (currentAttempt != null) { + currentAMBlacklist = currentAttempt.getAMBlacklist(); + } else { + if (amBlacklistingEnabled) { + currentAMBlacklist = new SimpleBlacklistManager( + scheduler.getNumClusterNodes(), blacklistDisableThreshold); + } else { + currentAMBlacklist = new DisabledBlacklistManager(); + } + } RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService, submissionContext, conf, @@ -804,7 +829,8 @@ private void createNewAttempt() { // previously failed attempts(which should not include Preempted, // hardware error and NM resync) + 1) equal to the max-attempt // limit. - maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq); + maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq, + currentAMBlacklist); attempts.put(appAttemptId, attempt); currentAttempt = attempt; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index b85174e..4dd8345 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; /** @@ -185,6 +186,12 @@ ApplicationResourceUsageReport getApplicationResourceUsageReport(); /** + * Get the {@link BlacklistManager} that manages blacklists for AM failures + * @return the {@link BlacklistManager} that tracks AM failures. + */ + BlacklistManager getAMBlacklist(); + + /** * the start time of the application. * @return the start time of the application. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 74a4000..74e61f0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -36,7 +36,6 @@ import javax.crypto.SecretKey; -import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -71,6 +70,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; +import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistAdditionsRemovals; +import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager; +import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; @@ -182,6 +184,7 @@ private RMAppAttemptMetrics attemptMetrics = null; private ResourceRequest amReq = null; + private BlacklistManager blacklistedNodesForAM = null; private static final StateMachineFactory EMPTY_CONTAINER_REQUEST_LIST = new ArrayList(); + private static final List EMPTY_SYSTEM_BLACKLIST = new ArrayList<>(); + @VisibleForTesting public static final class ScheduleTransition implements @@ -939,12 +955,25 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY); appAttempt.amReq.setResourceName(ResourceRequest.ANY); appAttempt.amReq.setRelaxLocality(true); - + + appAttempt.getAMBlacklist().refreshNodeHostCount( + appAttempt.scheduler.getNumClusterNodes()); + + BlacklistAdditionsRemovals amBlacklist = appAttempt.getAMBlacklist() + .getBlacklistAdditionsRemovals(); + if (LOG.isDebugEnabled()) { + LOG.debug("Using blacklist for AM: additions(" + + amBlacklist.getBlacklistAdditions() + ") and removals(" + + amBlacklist.getBlacklistRemovals() + ")"); + } // AM resource has been checked when submission Allocation amContainerAllocation = - appAttempt.scheduler.allocate(appAttempt.applicationAttemptId, + appAttempt.scheduler.allocate( + appAttempt.applicationAttemptId, Collections.singletonList(appAttempt.amReq), - EMPTY_CONTAINER_RELEASE_LIST, null, null); + EMPTY_CONTAINER_RELEASE_LIST, + amBlacklist.getBlacklistAdditions(), + amBlacklist.getBlacklistRemovals()); if (amContainerAllocation != null && amContainerAllocation.getContainers() != null) { assert (amContainerAllocation.getContainers().size() == 0); @@ -1331,7 +1360,11 @@ public boolean shouldCountTowardsMaxAttemptRetry() { } } - private static final class UnmanagedAMAttemptSavedTransition + private boolean shouldCountTowardsNodeBlacklisting(int exitStatus) { + return exitStatus == ContainerExitStatus.DISKS_FAILED; + } + + private static final class UnmanagedAMAttemptSavedTransition extends AMLaunchedTransition { @Override public void transition(RMAppAttemptImpl appAttempt, @@ -1694,6 +1727,14 @@ private void sendFinishedContainersToNM() { private void sendAMContainerToNM(RMAppAttemptImpl appAttempt, RMAppAttemptContainerFinishedEvent containerFinishedEvent) { NodeId nodeId = containerFinishedEvent.getNodeId(); + if (containerFinishedEvent.getContainerStatus() != null) { + if (shouldCountTowardsNodeBlacklisting(containerFinishedEvent + .getContainerStatus().getExitStatus())) { + appAttempt.addAMNodeToBlackList(containerFinishedEvent.getNodeId()); + } + } else { + LOG.warn("No ContainerStatus in containerFinishedEvent"); + } finishedContainersSentToAM.putIfAbsent(nodeId, new ArrayList()); appAttempt.finishedContainersSentToAM.get(nodeId).add( @@ -1708,6 +1749,15 @@ private void sendAMContainerToNM(RMAppAttemptImpl appAttempt, } } + private void addAMNodeToBlackList(NodeId nodeId) { + blacklistedNodesForAM.addNodeContainerFailure(nodeId.getHost().toString()); + } + + @Override + public BlacklistManager getAMBlacklist() { + return blacklistedNodesForAM; + } + private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt, RMAppAttemptContainerFinishedEvent containerFinishedEvent) { appAttempt.justFinishedContainers.putIfAbsent(containerFinishedEvent diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 0b6b8ef..716aa00 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -98,7 +98,7 @@ private long configuredMaximumAllocationWaitTime; protected RMContext rmContext; - + private volatile Priority maxClusterLevelAppPriority; /* diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 77ac5b3..6f17e5f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -65,7 +65,8 @@ new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator()); final Map> requests = new ConcurrentHashMap>(); - private Set blacklist = new HashSet(); + private Set userBlacklist = new HashSet(); + private Set systemBlacklist = new HashSet(); //private final ApplicationStore store; private ActiveUsersManager activeUsersManager; @@ -217,19 +218,29 @@ synchronized public boolean updateResourceRequests( } /** - * The ApplicationMaster is updating the blacklist + * The ApplicationMaster is updating the userBlacklist * - * @param blacklistAdditions resources to be added to the blacklist - * @param blacklistRemovals resources to be removed from the blacklist + * @param blacklistAdditions resources to be added to the userBlacklist + * @param blacklistRemovals resources to be removed from the userBlacklist */ synchronized public void updateBlacklist( List blacklistAdditions, List blacklistRemovals) { - // Add to blacklist + updateBlacklist(userBlacklist, blacklistAdditions, blacklistRemovals); + } + + synchronized public void updateSystemBlacklist( + List blacklistAdditions, List blacklistRemovals) { + updateBlacklist(systemBlacklist, blacklistAdditions, blacklistRemovals); + } + + void updateBlacklist(Set blacklist, + List blacklistAdditions, List blacklistRemovals) { + // Add to userBlacklist if (blacklistAdditions != null) { blacklist.addAll(blacklistAdditions); } - // Remove from blacklist + // Remove from userBlacklist if (blacklistRemovals != null) { blacklist.removeAll(blacklistRemovals); } @@ -263,8 +274,16 @@ public synchronized Resource getResource(Priority priority) { return (request == null) ? null : request.getCapability(); } - public synchronized boolean isBlacklisted(String resourceName) { - return blacklist.contains(resourceName); + /** + * Returns if the node is either blacklisted by the user or the system + * @param resourceName the resourcename + * @param useSystemBlacklist true if it should check systemBlacklist + * @return true if its blacklisted + */ + public synchronized boolean isBlacklisted(String resourceName, + boolean useSystemBlacklist) { + return useSystemBlacklist ? systemBlacklist.contains(resourceName) + : userBlacklist.contains(resourceName); } /** @@ -474,18 +493,18 @@ public synchronized void setQueue(Queue queue) { } public synchronized Set getBlackList() { - return this.blacklist; + return this.userBlacklist; } public synchronized Set getBlackListCopy() { - return new HashSet<>(this.blacklist); + return new HashSet<>(this.userBlacklist); } public synchronized void transferStateFromPreviousAppSchedulingInfo( AppSchedulingInfo appInfo) { // this.priorities = appInfo.getPriorities(); // this.requests = appInfo.getRequests(); - this.blacklist = appInfo.getBlackList(); + this.userBlacklist = appInfo.getBlackList(); } public synchronized void recoverContainer(RMContainer rmContainer) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 4872543..c370321 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -470,16 +470,9 @@ public ContainersAndNMTokensAllocation(List containerList, RMContainer rmContainer = i.next(); Container container = rmContainer.getContainer(); ContainerType containerType = ContainerType.TASK; - // The working knowledge is that masterContainer for AM is null as it - // itself is the master container. - RMAppAttempt appAttempt = - rmContext - .getRMApps() - .get( - container.getId().getApplicationAttemptId() - .getApplicationId()).getCurrentAppAttempt(); - if (appAttempt.getMasterContainer() == null - && appAttempt.getSubmissionContext().getUnmanagedAM() == false) { + boolean isWaitingForAMContainer = isWaitingForAMContainer( + container.getId().getApplicationAttemptId().getApplicationId()); + if (isWaitingForAMContainer) { containerType = ContainerType.APPLICATION_MASTER; } try { @@ -509,6 +502,17 @@ public ContainersAndNMTokensAllocation(List containerList, return new ContainersAndNMTokensAllocation(returnContainerList, nmTokens); } + public boolean isWaitingForAMContainer(ApplicationId applicationId) { + // The working knowledge is that masterContainer for AM is null as it + // itself is the master container. + RMAppAttempt appAttempt = + rmContext + .getRMApps() + .get(applicationId).getCurrentAppAttempt(); + return (appAttempt.getMasterContainer() == null + && appAttempt.getSubmissionContext().getUnmanagedAM() == false); + } + public synchronized void updateBlacklist( List blacklistAdditions, List blacklistRemovals) { if (!isStopped) { @@ -516,9 +520,18 @@ public synchronized void updateBlacklist( blacklistAdditions, blacklistRemovals); } } - + + public synchronized void updateSystemBlacklist( + List blacklistAdditions, List blacklistRemovals) { + if (!isStopped) { + this.appSchedulingInfo.updateSystemBlacklist( + blacklistAdditions, blacklistRemovals); + } + } + public boolean isBlacklisted(String resourceName) { - return this.appSchedulingInfo.isBlacklisted(resourceName); + boolean useSystemBlacklist = isWaitingForAMContainer(getApplicationId()); + return this.appSchedulingInfo.isBlacklisted(resourceName, useSystemBlacklist); } public synchronized int addMissedNonPartitionedRequestSchedulingOpportunity( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index f2753e6..e4577f3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -122,7 +122,8 @@ public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues, @Public @Stable public int getNumClusterNodes(); - + + /** * The main api between the ApplicationMaster and the Scheduler. * The ApplicationMaster is updating his future resource requirements diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index b5ccbd9..ac65eb0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -934,7 +934,13 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, } } - application.updateBlacklist(blacklistAdditions, blacklistRemovals); + if (application.isWaitingForAMContainer(application.getApplicationId())) { + // Allocate is for AM and update AM blacklist for this + application.updateSystemBlacklist( + blacklistAdditions, blacklistRemovals); + } else { + application.updateBlacklist(blacklistAdditions, blacklistRemovals); + } allocation = application.getAllocation(getResourceCalculator(), clusterResource, getMinimumResourceCapability()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerUtils.java deleted file mode 100644 index 9bece9b..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerUtils.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; - -import org.apache.commons.logging.Log; - -public class FiCaSchedulerUtils { - - public static boolean isBlacklisted(FiCaSchedulerApp application, - FiCaSchedulerNode node, Log LOG) { - if (application.isBlacklisted(node.getNodeName())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping 'host' " + node.getNodeName() + - " for " + application.getApplicationId() + - " since it has been blacklisted"); - } - return true; - } - - if (application.isBlacklisted(node.getRackName())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping 'rack' " + node.getRackName() + - " for " + application.getApplicationId() + - " since it has been blacklisted"); - } - return true; - } - - return false; - } - -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3eefb8f..8fd4511 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -955,7 +955,14 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, preemptionContainerIds.add(container.getContainerId()); } - application.updateBlacklist(blacklistAdditions, blacklistRemovals); + if (application.isWaitingForAMContainer(application.getApplicationId())) { + // Allocate is for AM and update AM blacklist for this + application.updateSystemBlacklist( + blacklistAdditions, blacklistRemovals); + } else { + application.updateBlacklist(blacklistAdditions, blacklistRemovals); + } + ContainersAndNMTokensAllocation allocation = application.pullNewlyAllocatedContainersAndNMTokens(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 6b77ceb..66a2298 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -352,11 +352,18 @@ public Allocation allocate( application.showRequests(); LOG.debug("allocate:" + - " applicationId=" + applicationAttemptId + + " applicationId=" + applicationAttemptId + " #ask=" + ask.size()); } - application.updateBlacklist(blacklistAdditions, blacklistRemovals); + if (application.isWaitingForAMContainer(application.getApplicationId())) { + // Allocate is for AM and update AM blacklist for this + application.updateSystemBlacklist( + blacklistAdditions, blacklistRemovals); + } else { + application.updateBlacklist(blacklistAdditions, blacklistRemovals); + } + ContainersAndNMTokensAllocation allocation = application.pullNewlyAllocatedContainersAndNMTokens(); Resource headroom = application.getHeadroom(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 5080355..e464401 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -750,10 +750,7 @@ private static void waitForSchedulerAppAttemptAdded( public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm) throws Exception { - rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); - RMAppAttempt attempt = app.getCurrentAppAttempt(); - waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm); - rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED); + RMAppAttempt attempt = waitForAttemptScheduled(app, rm); System.out.println("Launch AM " + attempt.getAppAttemptId()); nm.nodeHeartbeat(true); MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); @@ -761,6 +758,15 @@ public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm) return am; } + public static RMAppAttempt waitForAttemptScheduled(RMApp app, MockRM rm) + throws Exception { + rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); + RMAppAttempt attempt = app.getCurrentAppAttempt(); + waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm); + rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED); + return attempt; + } + public static MockAM launchAndRegisterAM(RMApp app, MockRM rm, MockNM nm) throws Exception { MockAM am = launchAM(app, rm, nm); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index d579595..9000bb9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -35,8 +35,12 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -49,11 +53,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; + +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.SystemClock; @@ -82,21 +92,7 @@ public void testAMRestartWithExistingContainers() throws Exception { MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); int NUM_CONTAINERS = 3; - // allocate NUM_CONTAINERS containers - am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS, - new ArrayList()); - nm1.nodeHeartbeat(true); - - // wait for containers to be allocated. - List containers = - am1.allocate(new ArrayList(), - new ArrayList()).getAllocatedContainers(); - while (containers.size() != NUM_CONTAINERS) { - nm1.nodeHeartbeat(true); - containers.addAll(am1.allocate(new ArrayList(), - new ArrayList()).getAllocatedContainers()); - Thread.sleep(200); - } + allocateContainers(nm1, am1, NUM_CONTAINERS); // launch the 2nd container, for testing running container transferred. nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING); @@ -244,6 +240,29 @@ public void testAMRestartWithExistingContainers() throws Exception { rm1.stop(); } + private List allocateContainers(MockNM nm1, MockAM am1, + int NUM_CONTAINERS) throws Exception { + // allocate NUM_CONTAINERS containers + am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS, + new ArrayList()); + nm1.nodeHeartbeat(true); + + // wait for containers to be allocated. + List containers = + am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + while (containers.size() != NUM_CONTAINERS) { + nm1.nodeHeartbeat(true); + containers.addAll(am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers()); + Thread.sleep(200); + } + + Assert.assertEquals("Did not get all containers allocated", + NUM_CONTAINERS, containers.size()); + return containers; + } + private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt) throws InterruptedException { int count = 0; @@ -258,6 +277,9 @@ private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt) public void testNMTokensRebindOnAMRestart() throws Exception { YarnConfiguration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3); + // To prevent test from blacklisting nm1 for AM, we sit threshold to half + // of 2 nodes which is 1 + conf.setFloat(YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD, 0.5f); MockRM rm1 = new MockRM(conf); rm1.start(); @@ -355,6 +377,108 @@ public void testNMTokensRebindOnAMRestart() throws Exception { rm1.stop(); } + @Test(timeout = 100000) + public void testAMBlacklistPreventsRestartOnSameNode() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, true); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + final DrainDispatcher dispatcher = new DrainDispatcher(); + MockRM rm1 = new MockRM(conf, memStore) { + @Override + protected EventHandler createSchedulerEventDispatcher() { + return new SchedulerEventDispatcher(this.scheduler) { + @Override + public void handle(SchedulerEvent event) { + scheduler.handle(event); + } + }; + } + + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + + rm1.start(); + + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + + MockNM nm2 = + new MockNM("127.0.0.2:2345", 8000, rm1.getResourceTrackerService()); + nm2.registerNode(); + + RMApp app1 = rm1.submitApp(200); + + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + CapacityScheduler scheduler = + (CapacityScheduler) rm1.getResourceScheduler(); + ContainerId amContainer = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); + // Preempt the first attempt; + RMContainer rmContainer = scheduler.getRMContainer(amContainer); + NodeId nodeWhereAMRan = rmContainer.getAllocatedNode(); + + MockNM currentNode, otherNode; + if (nodeWhereAMRan == nm1.getNodeId()) { + currentNode = nm1; + otherNode = nm2; + } else { + currentNode = nm2; + otherNode = nm1; + } + + ContainerStatus containerStatus = + BuilderUtils.newContainerStatus(amContainer, ContainerState.COMPLETE, + "", ContainerExitStatus.DISKS_FAILED); + currentNode.containerStatus(containerStatus); + am1.waitForState(RMAppAttemptState.FAILED); + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + + // restart the am + RMAppAttempt attempt = rm1.waitForAttemptScheduled(app1, rm1); + System.out.println("Launch AM " + attempt.getAppAttemptId()); + + + + currentNode.nodeHeartbeat(true); + dispatcher.await(); + Assert.assertEquals( + "AppAttemptState should still be SCHEDULED if currentNode is " + + "blacklisted correctly", + RMAppAttemptState.SCHEDULED, + attempt.getAppAttemptState()); + + otherNode.nodeHeartbeat(true); + dispatcher.await(); + + MockAM am2 = rm1.sendAMLaunched(attempt.getAppAttemptId()); + rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); + + amContainer = + ContainerId.newContainerId(am2.getApplicationAttemptId(), 1); + rmContainer = scheduler.getRMContainer(amContainer); + nodeWhereAMRan = rmContainer.getAllocatedNode(); + Assert.assertEquals( + "After blacklisting AM should have run on the other node", + otherNode.getNodeId(), nodeWhereAMRan); + + RegisterApplicationMasterResponse registerResponse = + am2.registerAppAttempt(); + rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + + List allocatedContainers = + allocateContainers(currentNode, am2, 1); + Assert.assertEquals( + "Even though AM is blacklisted from the node, application can still " + + "allocate containers there", + currentNode.getNodeId(), allocatedContainers.get(0).getNodeId()); + } + // AM container preempted, nm disk failure // should not be counted towards AM max retry count. @Test(timeout = 100000) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/TestBlacklistManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/TestBlacklistManager.java new file mode 100644 index 0000000..f86ae33 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/TestBlacklistManager.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.blacklist; + + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +public class TestBlacklistManager { + + @Test + public void testSimpleBlacklistBelowFailureThreshold() { + final int numberOfNodeManagerHosts = 3; + final double blacklistDisableFailureThreshold = 0.8; + BlacklistManager manager = new SimpleBlacklistManager( + numberOfNodeManagerHosts, blacklistDisableFailureThreshold); + String anyNode = "foo"; + String anyNode2 = "bar"; + manager.addNodeContainerFailure(anyNode); + manager.addNodeContainerFailure(anyNode2); + BlacklistAdditionsRemovals blacklist = manager + .getBlacklistAdditionsRemovals(); + + List blacklistAdditions = blacklist.getBlacklistAdditions(); + Collections.sort(blacklistAdditions); + List blacklistRemovals = blacklist.getBlacklistRemovals(); + String[] expectedBlacklistAdditions = new String[]{anyNode2, anyNode}; + Assert.assertArrayEquals( + "Blacklist additions was not as expected", + expectedBlacklistAdditions, + blacklistAdditions.toArray()); + Assert.assertTrue( + "Blacklist removals should be empty but was " + + blacklistRemovals, + blacklistRemovals.isEmpty()); + } + + @Test + public void testSimpleBlacklistAboveFailureThreshold() { + // Create a threshold of 0.5 * 3 i.e at 1.5 node failures. + BlacklistManager manager = new SimpleBlacklistManager(3, 0.5); + String anyNode = "foo"; + String anyNode2 = "bar"; + manager.addNodeContainerFailure(anyNode); + BlacklistAdditionsRemovals blacklist = manager + .getBlacklistAdditionsRemovals(); + + List blacklistAdditions = blacklist.getBlacklistAdditions(); + Collections.sort(blacklistAdditions); + List blacklistRemovals = blacklist.getBlacklistRemovals(); + String[] expectedBlacklistAdditions = new String[]{anyNode}; + Assert.assertArrayEquals( + "Blacklist additions was not as expected", + expectedBlacklistAdditions, + blacklistAdditions.toArray()); + Assert.assertTrue( + "Blacklist removals should be empty but was " + + blacklistRemovals, + blacklistRemovals.isEmpty()); + + manager.addNodeContainerFailure(anyNode2); + + blacklist = manager + .getBlacklistAdditionsRemovals(); + blacklistAdditions = blacklist.getBlacklistAdditions(); + Collections.sort(blacklistAdditions); + blacklistRemovals = blacklist.getBlacklistRemovals(); + Collections.sort(blacklistRemovals); + String[] expectedBlacklistRemovals = new String[] {anyNode2, anyNode}; + Assert.assertTrue( + "Blacklist additions should be empty but was " + + blacklistAdditions, + blacklistAdditions.isEmpty()); + Assert.assertArrayEquals( + "Blacklist removals was not as expected", + expectedBlacklistRemovals, + blacklistRemovals.toArray()); + } + + @Test + public void testDisabledBlacklist() { + BlacklistManager disabled = new DisabledBlacklistManager(); + String anyNode = "foo"; + disabled.addNodeContainerFailure(anyNode); + BlacklistAdditionsRemovals blacklist = disabled + .getBlacklistAdditionsRemovals(); + + List blacklistAdditions = blacklist.getBlacklistAdditions(); + List blacklistRemovals = blacklist.getBlacklistRemovals(); + Assert.assertTrue( + "Blacklist additions should be empty but was " + + blacklistAdditions, + blacklistAdditions.isEmpty()); + Assert.assertTrue( + "Blacklist removals should be empty but was " + + blacklistRemovals, + blacklistRemovals.isEmpty()); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java index fccfa19..484a1b6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java @@ -489,7 +489,7 @@ private RMApp createRMApp(Configuration conf) { 2, Resource.newInstance(10, 2), "test"); return new RMAppImpl(this.appId, this.rmContext, conf, "test", "test", "default", submissionContext, - this.rmContext.getScheduler(), + scheduler, this.rmContext.getApplicationMasterService(), System.currentTimeMillis(), "test", null, null); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 2e64d61..a5e3308 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -970,7 +970,7 @@ public void testRecoverApplication(ApplicationStateData appState, appState.getApplicationSubmissionContext().getApplicationId(), rmContext, conf, submissionContext.getApplicationName(), null, - submissionContext.getQueue(), submissionContext, null, null, + submissionContext.getQueue(), submissionContext, scheduler, null, appState.getSubmitTime(), submissionContext.getApplicationType(), submissionContext.getApplicationTags(), BuilderUtils.newResourceRequest( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index ffd1c1f..81c474ea 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -27,6 +28,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.Container; @@ -49,8 +51,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; +import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Test; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 403c8ea..1c9801d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -220,7 +220,7 @@ protected void createApplicationWithAMResource(ApplicationAttemptId attId, ApplicationId appId = attId.getApplicationId(); RMApp rmApp = new RMAppImpl(appId, rmContext, conf, null, user, null, ApplicationSubmissionContext.newInstance(appId, null, - queue, null, null, false, false, 0, amResource, null), null, null, + queue, null, null, false, false, 0, amResource, null), scheduler, null, 0, null, null, null); rmContext.getRMApps().put(appId, rmApp); RMAppEvent event = new RMAppEvent(appId, RMAppEventType.START);