diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index 08cb1e6..948725b 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -881,6 +881,11 @@ public int getNumClusterNodes() { } @Override + public int getNumClusterHosts() { + return scheduler.getNumClusterHosts(); + } + + @Override public SchedulerNodeReport getNodeReport(NodeId nodeId) { return scheduler.getNodeReport(nodeId); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 6b660f7..c082dfc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1934,6 +1934,10 @@ public static boolean isDistributedNodeLabelConfiguration(Configuration conf) { NODELABEL_CONFIGURATION_TYPE, DEFAULT_NODELABEL_CONFIGURATION_TYPE)); } + public static final String AM_BLACKLISTING_ENABLED = + YARN_PREFIX + "am.blacklisting.enabled"; + public static final boolean DEFAULT_AM_BLACKLISTING_ENABLED = false; + public YarnConfiguration() { super(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistAdditionsRemovals.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistAdditionsRemovals.java new file mode 100644 index 0000000..5b49bba --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistAdditionsRemovals.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.blacklist; + +import java.util.List; + +/** + * Class to track blacklist additions and removals. + */ +public class BlacklistAdditionsRemovals { + + private List blacklistAdditions; + private List blacklistRemovals; + + public BlacklistAdditionsRemovals(List blacklistAdditions, + List blacklistRemovals) { + this.blacklistAdditions = blacklistAdditions; + this.blacklistRemovals = blacklistRemovals; + } + + public List getBlacklistAdditions() { + return blacklistAdditions; + } + + public List getBlacklistRemovals() { + return blacklistRemovals; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistManager.java new file mode 100644 index 0000000..4b7f580 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistManager.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.blacklist; + +/** + * Tracks blacklists based on failures reported on nodes. + */ +public interface BlacklistManager { + + /** + * Report failure of a container on node. + * @param node that has a container failure + */ + void addNodeContainerFailure(String node); + + /** + * Get {@link BlacklistAdditionsRemovals} that indicate which nodes should be + * added or to removed from the blacklist. + * @return {@link BlacklistAdditionsRemovals} + */ + BlacklistAdditionsRemovals getBlacklistAdditionsRemovals(); + + /** + * Refresh the number of nodemanager hosts available for scheduling. + * @param nodeHostCount is the number of node hosts. + */ + void refreshNodeHostCount(int nodeHostCount); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/DisabledBlacklistManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/DisabledBlacklistManager.java new file mode 100644 index 0000000..bc2036c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/DisabledBlacklistManager.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.blacklist; + +import java.util.ArrayList; + +/** + * A {@link BlacklistManager} that returns no blacklists. + */ +public class DisabledBlacklistManager implements BlacklistManager{ + + private BlacklistAdditionsRemovals noBlacklist = + new BlacklistAdditionsRemovals(new ArrayList(), + new ArrayList()); + + @Override + public void addNodeContainerFailure(String node) { + } + + @Override + public BlacklistAdditionsRemovals getBlacklistAdditionsRemovals() { + return noBlacklist; + } + + @Override + public void refreshNodeHostCount(int nodeHostCount) { + // Do nothing + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/SimpleBlacklistManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/SimpleBlacklistManager.java new file mode 100644 index 0000000..4ceae16 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/SimpleBlacklistManager.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.blacklist; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Maintains a list of failed nodes and returns that as long as number of + * blacklisted nodes is below a threshold percentage of total nodes. If more + * than threshold number of nodes are marked as failure they all are returned + * as removal from blacklist so previous additions are reversed. + */ +public class SimpleBlacklistManager implements BlacklistManager { + + public static final double DEFAULT_FAILURE_THRESHOLD = 0.8; + + private int numberOfNodeManagerHosts; + private final double failureThresholdRatio; + private final Set blacklistNodes = new HashSet<>(); + private static final ArrayList EMPTY_LIST = new ArrayList<>(); + + private static final Log LOG = LogFactory.getLog(SimpleBlacklistManager.class); + + public SimpleBlacklistManager(int numberOfNodeManagerHosts) { + this(numberOfNodeManagerHosts, DEFAULT_FAILURE_THRESHOLD); + } + + public SimpleBlacklistManager(int numberOfNodeManagerHosts, + double failureThresholdRatio) { + this.numberOfNodeManagerHosts = numberOfNodeManagerHosts; + this.failureThresholdRatio = failureThresholdRatio; + } + + @Override + public void addNodeContainerFailure(String node) { + blacklistNodes.add(node); + } + + @Override + public void refreshNodeHostCount(int nodeHostCount) { + this.numberOfNodeManagerHosts = nodeHostCount; + } + + @Override + public BlacklistAdditionsRemovals getBlacklistAdditionsRemovals() { + BlacklistAdditionsRemovals ret; + List blacklist = new ArrayList<>(blacklistNodes); + final int currentBlacklistSize = blacklist.size(); + final double failureThreshold = this.failureThresholdRatio * + numberOfNodeManagerHosts; + if (currentBlacklistSize < failureThreshold) { + if (LOG.isDebugEnabled()) { + LOG.debug("blacklist size " + currentBlacklistSize + " is less than " + + "failure threshold ratio " + failureThresholdRatio + " out of " + + "total usable nodes " + numberOfNodeManagerHosts); + } + ret = new BlacklistAdditionsRemovals(blacklist, EMPTY_LIST); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("blacklist size " + currentBlacklistSize + " is more than " + + "failure threshold ratio " + failureThresholdRatio + " out of " + + "total usable nodes " + numberOfNodeManagerHosts); + } + ret = new BlacklistAdditionsRemovals(EMPTY_LIST, blacklist); + } + return ret; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 90e63c1..47ca22f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -74,6 +74,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager; +import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager; +import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; @@ -133,6 +136,7 @@ private final Set applicationTags; private final long attemptFailuresValidityInterval; + private final boolean amBlacklistingEnabled; private Clock systemClock; @@ -456,6 +460,10 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, maxLogAggregationDiagnosticsInMemory = conf.getInt( YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY, YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY); + + amBlacklistingEnabled = conf.getBoolean( + YarnConfiguration.AM_BLACKLISTING_ENABLED, + YarnConfiguration.DEFAULT_AM_BLACKLISTING_ENABLED); } @Override @@ -795,6 +803,18 @@ public void recover(RMState state) { private void createNewAttempt() { ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1); + + BlacklistManager currentAMBlacklist = null; + if (currentAttempt != null) { + currentAMBlacklist = currentAttempt.getAMBlacklist(); + } else { + if (amBlacklistingEnabled) { + currentAMBlacklist = new SimpleBlacklistManager( + scheduler.getNumClusterHosts()); + } else { + currentAMBlacklist = new DisabledBlacklistManager(); + } + } RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService, submissionContext, conf, @@ -802,7 +822,8 @@ private void createNewAttempt() { // previously failed attempts(which should not include Preempted, // hardware error and NM resync) + 1) equal to the max-attempt // limit. - maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq); + maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq, + currentAMBlacklist); attempts.put(appAttemptId, attempt); currentAttempt = attempt; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index b85174e..4dd8345 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; /** @@ -185,6 +186,12 @@ ApplicationResourceUsageReport getApplicationResourceUsageReport(); /** + * Get the {@link BlacklistManager} that manages blacklists for AM failures + * @return the {@link BlacklistManager} that tracks AM failures. + */ + BlacklistManager getAMBlacklist(); + + /** * the start time of the application. * @return the start time of the application. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 5171bba..73398e4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -71,6 +71,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; +import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistAdditionsRemovals; +import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager; +import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; @@ -182,6 +185,7 @@ private RMAppAttemptMetrics attemptMetrics = null; private ResourceRequest amReq = null; + private BlacklistManager blacklistedNodesForAM = null; private static final StateMachineFactory()); appAttempt.finishedContainersSentToAM.get(nodeId).add( @@ -1692,6 +1736,15 @@ private void sendAMContainerToNM(RMAppAttemptImpl appAttempt, } } + private void addAMNodeToBlackList(NodeId nodeId) { + blacklistedNodesForAM.addNodeContainerFailure(nodeId.getHost().toString()); + } + + @Override + public BlacklistManager getAMBlacklist() { + return blacklistedNodesForAM; + } + private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt, RMAppAttemptContainerFinishedEvent containerFinishedEvent) { appAttempt.justFinishedContainers.putIfAbsent(containerFinishedEvent diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index ae927f1..fd85bbc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -122,6 +122,15 @@ public AbstractYarnScheduler(String name) { } @Override + public int getNumClusterHosts() { + Set hosts = new HashSet<>(); + for (NodeId node : this.nodes.keySet()) { + hosts.add(node.getHost()); + } + return hosts.size(); + } + + @Override public void serviceInit(Configuration conf) throws Exception { nmExpireInterval = conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index b99b217..d348073 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -119,7 +119,16 @@ public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues, @Public @Stable public int getNumClusterNodes(); - + + + /** + * Get the number of unique node hosts available in the cluster. + * @return the number of unique node hosts. + */ + @Public + @Stable + public int getNumClusterHosts(); + /** * The main api between the ApplicationMaster and the Scheduler. * The ApplicationMaster is updating his future resource requirements diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index d068a94..acb454f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -733,10 +733,7 @@ private static void waitForSchedulerAppAttemptAdded( public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm) throws Exception { - rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); - RMAppAttempt attempt = app.getCurrentAppAttempt(); - waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm); - rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED); + RMAppAttempt attempt = waitForAttemptScheduled(app, rm); System.out.println("Launch AM " + attempt.getAppAttemptId()); nm.nodeHeartbeat(true); MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); @@ -744,6 +741,15 @@ public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm) return am; } + public static RMAppAttempt waitForAttemptScheduled(RMApp app, MockRM rm) + throws Exception { + rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); + RMAppAttempt attempt = app.getCurrentAppAttempt(); + waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm); + rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED); + return attempt; + } + public static MockAM launchAndRegisterAM(RMApp app, MockRM rm, MockNM nm) throws Exception { MockAM am = launchAM(app, rm, nm); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 3387f41..4696687 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -35,8 +35,12 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -49,11 +53,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.SystemClock; @@ -82,21 +88,7 @@ public void testAMRestartWithExistingContainers() throws Exception { MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); int NUM_CONTAINERS = 3; - // allocate NUM_CONTAINERS containers - am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS, - new ArrayList()); - nm1.nodeHeartbeat(true); - - // wait for containers to be allocated. - List containers = - am1.allocate(new ArrayList(), - new ArrayList()).getAllocatedContainers(); - while (containers.size() != NUM_CONTAINERS) { - nm1.nodeHeartbeat(true); - containers.addAll(am1.allocate(new ArrayList(), - new ArrayList()).getAllocatedContainers()); - Thread.sleep(200); - } + allocateContainers(nm1, am1, NUM_CONTAINERS); // launch the 2nd container, for testing running container transferred. nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING); @@ -244,6 +236,29 @@ public void testAMRestartWithExistingContainers() throws Exception { rm1.stop(); } + private List allocateContainers(MockNM nm1, MockAM am1, + int NUM_CONTAINERS) throws Exception { + // allocate NUM_CONTAINERS containers + am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS, + new ArrayList()); + nm1.nodeHeartbeat(true); + + // wait for containers to be allocated. + List containers = + am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + while (containers.size() != NUM_CONTAINERS) { + nm1.nodeHeartbeat(true); + containers.addAll(am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers()); + Thread.sleep(200); + } + + Assert.assertEquals("Did not get all containers allocated", + NUM_CONTAINERS, containers.size()); + return containers; + } + private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt) throws InterruptedException { int count = 0; @@ -355,6 +370,104 @@ public void testNMTokensRebindOnAMRestart() throws Exception { rm1.stop(); } + @Test(timeout = 100000) + public void testAMBlacklistPreventsRestartOnSameNode() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, true); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + final DrainDispatcher dispatcher = new DrainDispatcher(); + MockRM rm1 = new MockRM(conf, memStore) { + @Override + protected EventHandler createSchedulerEventDispatcher() { + return new SchedulerEventDispatcher(this.scheduler) { + @Override + public void handle(SchedulerEvent event) { + scheduler.handle(event); + } + }; + } + + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + + rm1.start(); + + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + + MockNM nm2 = + new MockNM("127.0.0.2:2345", 8000, rm1.getResourceTrackerService()); + nm2.registerNode(); + + RMApp app1 = rm1.submitApp(200); + + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + CapacityScheduler scheduler = + (CapacityScheduler) rm1.getResourceScheduler(); + ContainerId amContainer = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); + // Preempt the first attempt; + RMContainer rmContainer = scheduler.getRMContainer(amContainer); + NodeId nodeWhereAMRan = rmContainer.getAllocatedNode(); + + scheduler.killContainer(rmContainer); + am1.waitForState(RMAppAttemptState.FAILED); + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + + // restart the am + RMAppAttempt attempt = rm1.waitForAttemptScheduled(app1, rm1); + System.out.println("Launch AM " + attempt.getAppAttemptId()); + + MockNM currentNode, otherNode; + + if (nodeWhereAMRan == nm1.getNodeId()) { + currentNode = nm1; + otherNode = nm2; + } else { + currentNode = nm2; + otherNode = nm1; + } + + currentNode.nodeHeartbeat(true); + dispatcher.await(); + Assert.assertEquals( + "AppAttemptState should still be SCHEDULED if currentNode is " + + "blacklisted correctly", + RMAppAttemptState.SCHEDULED, + attempt.getAppAttemptState()); + + otherNode.nodeHeartbeat(true); + dispatcher.await(); + + MockAM am2 = rm1.sendAMLaunched(attempt.getAppAttemptId()); + rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); + + amContainer = + ContainerId.newContainerId(am2.getApplicationAttemptId(), 1); + rmContainer = scheduler.getRMContainer(amContainer); + nodeWhereAMRan = rmContainer.getAllocatedNode(); + Assert.assertEquals( + "After blacklisting AM should have run on the other node", + otherNode.getNodeId(), nodeWhereAMRan); + + RegisterApplicationMasterResponse registerResponse = + am2.registerAppAttempt(); + rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + + List allocatedContainers = + allocateContainers(currentNode, am2, 1); + Assert.assertEquals( + "Even though AM is blacklisted from the node, application can still " + + "allocate containers there", + currentNode.getNodeId(), allocatedContainers.get(0).getNodeId()); + } + // AM container preempted, nm disk failure // should not be counted towards AM max retry count. @Test(timeout = 100000) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/TestBlacklistManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/TestBlacklistManager.java new file mode 100644 index 0000000..2c954ba --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/TestBlacklistManager.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.blacklist; + + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +public class TestBlacklistManager { + + @Test + public void testSimpleBlacklistBelowFailureThreshold() { + BlacklistManager manager = new SimpleBlacklistManager(3); + String anyNode = "foo"; + String anyNode2 = "bar"; + manager.addNodeContainerFailure(anyNode); + manager.addNodeContainerFailure(anyNode2); + BlacklistAdditionsRemovals blacklist = manager + .getBlacklistAdditionsRemovals(); + + List blacklistAdditions = blacklist.getBlacklistAdditions(); + Collections.sort(blacklistAdditions); + List blacklistRemovals = blacklist.getBlacklistRemovals(); + String[] expectedBlacklistAdditions = new String[]{anyNode2, anyNode}; + Assert.assertArrayEquals( + "Blacklist additions was not as expected", + expectedBlacklistAdditions, + blacklistAdditions.toArray()); + Assert.assertTrue( + "Blacklist removals should be empty but was " + + blacklistRemovals, + blacklistRemovals.isEmpty()); + } + + @Test + public void testSimpleBlacklistAboveFailureThreshold() { + // Create a threshold of 0.5 * 3 i.e at 1.5 node failures. + BlacklistManager manager = new SimpleBlacklistManager(3, 0.5); + String anyNode = "foo"; + String anyNode2 = "bar"; + manager.addNodeContainerFailure(anyNode); + BlacklistAdditionsRemovals blacklist = manager + .getBlacklistAdditionsRemovals(); + + List blacklistAdditions = blacklist.getBlacklistAdditions(); + Collections.sort(blacklistAdditions); + List blacklistRemovals = blacklist.getBlacklistRemovals(); + String[] expectedBlacklistAdditions = new String[]{anyNode}; + Assert.assertArrayEquals( + "Blacklist additions was not as expected", + expectedBlacklistAdditions, + blacklistAdditions.toArray()); + Assert.assertTrue( + "Blacklist removals should be empty but was " + + blacklistRemovals, + blacklistRemovals.isEmpty()); + + manager.addNodeContainerFailure(anyNode2); + + blacklist = manager + .getBlacklistAdditionsRemovals(); + blacklistAdditions = blacklist.getBlacklistAdditions(); + Collections.sort(blacklistAdditions); + blacklistRemovals = blacklist.getBlacklistRemovals(); + Collections.sort(blacklistRemovals); + String[] expectedBlacklistRemovals = new String[] {anyNode2, anyNode}; + Assert.assertTrue( + "Blacklist additions should be empty but was " + + blacklistAdditions, + blacklistAdditions.isEmpty()); + Assert.assertArrayEquals( + "Blacklist removals was not as expected", + expectedBlacklistRemovals, + blacklistRemovals.toArray()); + } + + @Test + public void testDisabledBlacklist() { + BlacklistManager disabled = new DisabledBlacklistManager(); + String anyNode = "foo"; + disabled.addNodeContainerFailure(anyNode); + BlacklistAdditionsRemovals blacklist = disabled + .getBlacklistAdditionsRemovals(); + + List blacklistAdditions = blacklist.getBlacklistAdditions(); + List blacklistRemovals = blacklist.getBlacklistRemovals(); + Assert.assertTrue( + "Blacklist additions should be empty but was " + + blacklistAdditions, + blacklistAdditions.isEmpty()); + Assert.assertTrue( + "Blacklist removals should be empty but was " + + blacklistRemovals, + blacklistRemovals.isEmpty()); + } +}