.../apache/hadoop/yarn/conf/YarnConfiguration.java | 29 ++++- .../src/main/resources/yarn-default.xml | 16 +++ .../resourcemanager/RMActiveServiceContext.java | 15 +++ .../yarn/server/resourcemanager/RMContext.java | 3 + .../yarn/server/resourcemanager/RMContextImpl.java | 13 ++ .../server/resourcemanager/ResourceManager.java | 10 ++ .../amcontainer/AMContainerAllocationExpirer.java | 88 ++++++++++++++ .../AMContainerAllocationExpirerInfo.java | 63 ++++++++++ .../AMContainerAllocationExpirerUtil.java | 108 +++++++++++++++++ .../server/resourcemanager/rmapp/RMAppImpl.java | 3 + .../rmapp/attempt/RMAppAttemptImpl.java | 3 + .../server/resourcemanager/rmnode/RMNodeImpl.java | 32 +++++ .../TestAMContainerAllocationExpirer.java | 133 +++++++++++++++++++++ .../rmapp/attempt/TestRMAppAttemptTransitions.java | 8 +- 14 files changed, 521 insertions(+), 3 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index a8a87ad8c9c..24eb89c5e66 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -558,7 +558,19 @@ public static boolean isAclEnabled(Configuration conf) { public static final String RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS = RM_PREFIX + "rm.container-allocation.expiry-interval-ms"; public static final int DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS = 600000; - + + /** How long to wait for AM container allocation default 15 minutes */ + public static final String AM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS = RM_PREFIX + + "am.container-allocation.expiry-interval-ms"; + public static final int DEFAULT_AM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS = 900000; + + /** + * AM Container Allocation Expire Enabled or not. By default enabled = false. + */ + public static final String AMCONTAINER_ALLOCATIONEXPIRY_ENABLED = RM_PREFIX + + "am.container-allocation.expiry.enabled"; + public static final boolean DEFAULT_AMCONTAINER_ALLOCATIONEXPIRY_ENABLED = false; + /** Path to file with nodes to include.*/ public static final String RM_NODES_INCLUDE_FILE_PATH = RM_PREFIX + "nodes.include-path"; @@ -4786,4 +4798,19 @@ public static long getSkipNodeInterval(Configuration conf) { public static void main(String[] args) throws Exception { new YarnConfiguration(new Configuration()).writeXml(System.out); } + + /** + * Returns true if am allocation expiry is enabled else false + * @param conf + * @return boolean + */ + public static boolean isAMContainerAllocationExpiryEnabled( + Configuration conf) { + if (null == conf) { + return false; + } + return conf.getBoolean( + YarnConfiguration.AMCONTAINER_ALLOCATIONEXPIRY_ENABLED, + YarnConfiguration.DEFAULT_AMCONTAINER_ALLOCATIONEXPIRY_ENABLED); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index f7d9fc1d2b0..2288c5106af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1139,6 +1139,22 @@ 600000 + + + AM container is not allocated for specified time then kill the application + + yarn.resourcemanager.am.container-allocation.expiry.enabled + false + + + + + The expiry interval for AM container + + yarn.resourcemanager.am.container-allocation.expiry-interval-ms + 900000 + + Flag to enable/disable resource profiles diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index f1b0c794031..c19528b58ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.amcontainer.AMContainerAllocationExpirer; import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; @@ -91,6 +92,7 @@ private AMLivelinessMonitor amFinishingMonitor; private RMStateStore stateStore = null; private ContainerAllocationExpirer containerAllocationExpirer; + private AMContainerAllocationExpirer amContainerAllocationExpirer; private DelegationTokenRenewer delegationTokenRenewer; private AMRMTokenSecretManager amRMTokenSecretManager; private RMContainerTokenSecretManager containerTokenSecretManager; @@ -604,4 +606,17 @@ public Long getTokenSequenceNo() { public void incrTokenSequenceNo() { this.tokenSequenceNo.incrementAndGet(); } + + @Private + @Unstable + public AMContainerAllocationExpirer getAMContainerAllocationExpirer() { + return this.amContainerAllocationExpirer; + } + + @Private + @Unstable + void setAMContainerAllocationExpirer( + AMContainerAllocationExpirer amContainerAllocationExpirer) { + this.amContainerAllocationExpirer = amContainerAllocationExpirer; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 55420bd9270..c7e0aa31f69 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetimeMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; +import org.apache.hadoop.yarn.server.resourcemanager.amcontainer.AMContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; @@ -203,4 +204,6 @@ void setMultiNodeSortingManager( long getTokenSequenceNo(); void incrTokenSequenceNo(); + + AMContainerAllocationExpirer getAMContainerAllocationExpirer(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 7f10138494e..7675f5a51d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetimeMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; +import org.apache.hadoop.yarn.server.resourcemanager.amcontainer.AMContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; @@ -678,4 +679,16 @@ public long getTokenSequenceNo() { public void incrTokenSequenceNo() { this.activeServiceContext.incrTokenSequenceNo(); } + + @VisibleForTesting + public void setAMContainerAllocationExpirer( + AMContainerAllocationExpirer amContainerAllocationExpirer) { + activeServiceContext + .setAMContainerAllocationExpirer(amContainerAllocationExpirer); + } + + @Override + public AMContainerAllocationExpirer getAMContainerAllocationExpirer() { + return activeServiceContext.getAMContainerAllocationExpirer(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index c315b335415..3f34b713082 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -95,6 +95,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetimeMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; +import org.apache.hadoop.yarn.server.resourcemanager.amcontainer.AMContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; @@ -655,6 +656,7 @@ protected static void validateConfigs(Configuration conf) { private EventHandler schedulerDispatcher; private ApplicationMasterLauncher applicationMasterLauncher; private ContainerAllocationExpirer containerAllocationExpirer; + private AMContainerAllocationExpirer amContainerAllocationExpirer; private ResourceManager rm; private boolean fromActive = false; private StandByTransitionRunnable standByTransitionRunnable; @@ -676,6 +678,14 @@ protected void serviceInit(Configuration configuration) throws Exception { addService(containerAllocationExpirer); rmContext.setContainerAllocationExpirer(containerAllocationExpirer); + if (YarnConfiguration.isAMContainerAllocationExpiryEnabled(conf)) { + amContainerAllocationExpirer = new AMContainerAllocationExpirer( + rmContext); + addService(amContainerAllocationExpirer); + ((RMContextImpl) rmContext) + .setAMContainerAllocationExpirer(amContainerAllocationExpirer); + } + AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor(); addService(amLivelinessMonitor); rmContext.setAMLivelinessMonitor(amLivelinessMonitor); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amcontainer/AMContainerAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amcontainer/AMContainerAllocationExpirer.java new file mode 100644 index 00000000000..28388e8dc56 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amcontainer/AMContainerAllocationExpirer.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.amcontainer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor; + +@SuppressWarnings({ "unchecked", "rawtypes" }) +public class AMContainerAllocationExpirer + extends AbstractLivelinessMonitor { + + private static final Log LOG = LogFactory + .getLog(AMContainerAllocationExpirer.class); + private EventHandler dispatcher; + private RMContext rmContext; + private int expireIntervalInSec; + + public AMContainerAllocationExpirer(RMContext rmContext) { + super(AMContainerAllocationExpirer.class.getName()); + this.dispatcher = rmContext.getDispatcher().getEventHandler(); + this.rmContext = rmContext; + this.expireIntervalInSec = rmContext.getYarnConfiguration().getInt( + YarnConfiguration.AM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_AM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS) / 1000; + } + + public void serviceInit(Configuration conf) throws Exception { + int expireIntvl = conf.getInt( + YarnConfiguration.AM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_AM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS); + setExpireInterval(expireIntvl); + setMonitorInterval(expireIntvl / 3); + super.serviceInit(conf); + } + + @Override + protected void expire(AMContainerAllocationExpirerInfo info) { + ApplicationId appId = info.getApplicationId(); + RMApp app = rmContext.getRMApps().get(appId); + if (app == null) { + rmContext.getAMContainerAllocationExpirer() + .unregister(new AMContainerAllocationExpirerInfo(appId)); + return; + } + + String diagnostics = "Application is killed by ResourceManager as AM container is not" + + " allocated resources in the specified time out period " + + expireIntervalInSec + "."; + + if (LOG.isInfoEnabled()) { + LOG.info("Application " + appId + + " is going to kill by ResourceManager as AM container is not" + + " allocated resources in the specified time out period" + + expireIntervalInSec + "."); + } + + // Kill the app only if AM container still not allocated + if (null == app.getCurrentAppAttempt().getMasterContainer()) { + dispatcher + .handle(new RMAppEvent(appId, RMAppEventType.KILL, diagnostics)); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amcontainer/AMContainerAllocationExpirerInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amcontainer/AMContainerAllocationExpirerInfo.java new file mode 100644 index 00000000000..e8bf0614d6d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amcontainer/AMContainerAllocationExpirerInfo.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.amcontainer; + +import org.apache.hadoop.yarn.api.records.ApplicationId; + +public class AMContainerAllocationExpirerInfo + implements Comparable { + + private final ApplicationId applicationId; + + public AMContainerAllocationExpirerInfo(ApplicationId applicationId) { + this.applicationId = applicationId; + } + + public ApplicationId getApplicationId() { + return this.applicationId; + } + + @Override + public int hashCode() { + return (this.applicationId.hashCode() << 16); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof AMContainerAllocationExpirerInfo)) { + return false; + } + return compareTo((AMContainerAllocationExpirerInfo) other) == 0; + } + + @Override + public int compareTo(AMContainerAllocationExpirerInfo other) { + if (other == null) { + return -1; + } + // Only need to compare applicationId. + return applicationId.compareTo(other.getApplicationId()); + } + + @Override + public String toString() { + return ""; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amcontainer/AMContainerAllocationExpirerUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amcontainer/AMContainerAllocationExpirerUtil.java new file mode 100644 index 00000000000..abaf6b96369 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amcontainer/AMContainerAllocationExpirerUtil.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.amcontainer; + +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; + +/** + * Utility class to register and unregister app with + * AMContainerAllocationExpirer. + */ +public final class AMContainerAllocationExpirerUtil { + private static final Log LOG = LogFactory + .getLog(AMContainerAllocationExpirerUtil.class); + private static final AMContainerAllocationExpirerUtil NEW_INSTANCE = new AMContainerAllocationExpirerUtil(); + + private AMContainerAllocationExpirerUtil() { + + } + + public static AMContainerAllocationExpirerUtil getInstance() { + return NEW_INSTANCE; + } + + /** + * Register running apps with AMContainerAllocationExpirer. + * + * @param rmNode + */ + public void registerWithAMContainerAllocationExpirer(RMContext context, + List runningAppsList) { + if (!runningAppsList.isEmpty() + && (YarnConfiguration.isAMContainerAllocationExpiryEnabled( + context.getYarnConfiguration()))) { + for (ApplicationId appId : runningAppsList) { + context.getAMContainerAllocationExpirer() + .register(new AMContainerAllocationExpirerInfo(appId)); + if (LOG.isDebugEnabled()) { + LOG.debug(appId + " regisetered with " + + "AMContainerAllocationExpirer because of node unhealthy"); + } + } + } + } + + /** + * Unregister app with AMContainerAllocationExpirer, when app is killed. + * + * @param rmContext + * @param app + */ + public void unregister(RMContext rmContext, RMAppImpl app) { + if (RMAppState.KILLED.equals(app.getState()) + && YarnConfiguration.isAMContainerAllocationExpiryEnabled( + rmContext.getYarnConfiguration())) { + rmContext.getAMContainerAllocationExpirer().unregister( + new AMContainerAllocationExpirerInfo(app.getApplicationId())); + if (LOG.isDebugEnabled()) { + LOG.debug(app.getApplicationId() + + " unregistered with AMContainerAllocationExpirer"); + } + } + } + + /** + * Unregistering app from AMContainerAllocationExpirer, after AM container is + * allocated successfully. + * + * @param rmContext + * @param appAttempt + */ + public void unregister(RMContext rmContext, RMAppAttemptImpl appAttempt) { + if (YarnConfiguration.isAMContainerAllocationExpiryEnabled( + rmContext.getYarnConfiguration())) { + rmContext.getAMContainerAllocationExpirer() + .unregister(new AMContainerAllocationExpirerInfo( + appAttempt.getAppAttemptId().getApplicationId())); + if (LOG.isDebugEnabled()) { + LOG.debug(appAttempt.getAppAttemptId() + + " unregistered with AMContainerAllocationExpirer"); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 94f7bb97bfe..ff97bf72afc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.amcontainer.AMContainerAllocationExpirerUtil; import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager; @@ -1339,6 +1340,8 @@ public FinalSavingTransition(Object transitionToDo, @Override public void transition(RMAppImpl app, RMAppEvent event) { + AMContainerAllocationExpirerUtil.getInstance().unregister(app.rmContext, + app); app.rememberTargetTransitionsAndStoreState(event, transitionToDo, targetedFinalState, stateToBeStored); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 2d6de3750e9..ae9e6b6af6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.amcontainer.AMContainerAllocationExpirerUtil; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager; @@ -1232,6 +1233,8 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, ClusterMetrics.getMetrics().addAMContainerAllocationDelay( allocationDelay); appAttempt.storeAttempt(); + AMContainerAllocationExpirerUtil.getInstance() + .unregister(appAttempt.rmContext, appAttempt); return RMAppAttemptState.ALLOCATED_SAVING; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index cec9915e0d1..a7c9a7573a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.amcontainer.AMContainerAllocationExpirerUtil; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; @@ -1365,6 +1366,18 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { LOG.info("Node " + rmNode.nodeId + " reported UNHEALTHY with details: " + remoteNodeHealthStatus.getHealthReport()); + List runningApps = rmNode.getRunningApps(); + Iterator iterator = runningApps.iterator(); + while (iterator.hasNext()) { + ApplicationId appId = iterator.next(); + if (!isAMLaunchedInSameRMNode(rmNode, appId)) { + iterator.remove(); + } + } + AMContainerAllocationExpirerUtil.getInstance() + .registerWithAMContainerAllocationExpirer(rmNode.context, + runningApps); + // if a node in decommissioning receives an unhealthy report, // it will stay in decommissioning. if (isNodeDecommissioning) { @@ -1401,6 +1414,25 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { return initialState; } + + /** + * Return true if AM launched in the same rmNode Else return false + * + * @param rmNode + * @param appId + * @return + */ + private boolean isAMLaunchedInSameRMNode(RMNodeImpl rmNode, + ApplicationId appId) { + RMApp rmApp = rmNode.context.getRMApps().get(appId); + if (null != rmApp + && null != rmApp.getCurrentAppAttempt().getMasterContainer() + && rmNode.getNodeID().equals( + rmApp.getCurrentAppAttempt().getMasterContainer().getNodeId())) { + return true; + } + return false; + } } public static class StatusUpdateWhenUnHealthyTransition implements diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/amcontainer/TestAMContainerAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/amcontainer/TestAMContainerAllocationExpirer.java new file mode 100644 index 00000000000..0126adaa3dc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/amcontainer/TestAMContainerAllocationExpirer.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.amcontainer; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData; +import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Test class for AM container allocation expire. Scenario is: when APP is + * running normally, node will become unhealthy. So AM container exit and RM try + * to launch AM for second attempt. In case, AM container (Only for previously + * running apps in the unhealthy node) for second attempt is not allocated for + * the specified timeout, then need to kill app. + */ +public class TestAMContainerAllocationExpirer { + private final int GB = 1024; + private YarnConfiguration conf; + RMNodeLabelsManager mgr; + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + } + + @Test(timeout = 90000) + public void testAppKilledWhenTimeoutExpire() throws Exception { + // This is to test register flow + testAppKilledWhenAMContainerIsNotAllocated(false); + } + + @Test(timeout = 900000) + public void testAppKilledBeforeTimeoutExpire() throws Exception { + // This is to test unregister flow by killing app before time out + testAppKilledWhenAMContainerIsNotAllocated(true); + } + + private void testAppKilledWhenAMContainerIsNotAllocated( + boolean isKillAppForcefully) + throws Exception, InterruptedException, IOException { + // Enable AM Container Allocation expire feature + // Set AM container allocation expire interval to 15 seconds + conf.setBoolean(YarnConfiguration.AMCONTAINER_ALLOCATIONEXPIRY_ENABLED, + true); + conf.setLong(YarnConfiguration.AM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, + 15000); + + // Start mock RM + MockRM rm = new MockRM(conf); + rm.start(); + + // Submit an application + MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder + .createWithMemory(1024, rm).withAppName("app").withUser("user") + .withAcls(null).withQueue("default").build(); + RMApp app = MockRMAppSubmitter.submit(rm, data); + + // Register new NM with RM + MockNM nm = rm.registerNode("127.0.0.1:1234", 4 * GB); + + // Launch AM on NM for the submitted app + MockAM am = MockRM.launchAndRegisterAM(app, rm, nm); + + // AM request a new container + am.allocate("127.0.0.1", 1 * GB, 1, new ArrayList()); + ContainerId containerId = ContainerId + .newContainerId(am.getApplicationAttemptId(), 2); + rm.waitForState(nm, containerId, RMContainerState.ALLOCATED); + + // Report container status RUNNING + nm.nodeHeartbeat(app.getCurrentAppAttempt().getAppAttemptId(), 2, + ContainerState.RUNNING); + + // Now stop NM heart beats to report NM as unhealthy + nm.nodeHeartbeat(false); + + // If isKillAppForcefully = true kill app before timeout expire + if (isKillAppForcefully) { + ((AbstractYarnScheduler) rm.getResourceScheduler()) + .killAllAppsInQueue("default"); + } + + // Wait for application to be killed by AMContainerAllocationExpirer + rm.waitForState(app.getApplicationId(), RMAppState.KILLED); + + // Final application status must be FinalApplicationStatus.KILLED + Assert.assertEquals(FinalApplicationStatus.KILLED, + app.getFinalApplicationStatus()); + + // Close/Stop rm + rm.close(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 4e5ff3f7687..eff69392689 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.amcontainer.AMContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; @@ -245,7 +246,9 @@ public void setUp() throws Exception { SecurityUtil.setAuthenticationMethod(authMethod, conf); UserGroupInformation.setConfiguration(conf); InlineDispatcher rmDispatcher = new InlineDispatcher(); - + + AMContainerAllocationExpirer amContainerAllocationExpirer = mock( + AMContainerAllocationExpirer.class); ContainerAllocationExpirer containerAllocationExpirer = mock(ContainerAllocationExpirer.class); amLivelinessMonitor = mock(AMLivelinessMonitor.class); @@ -260,7 +263,8 @@ public void setUp() throws Exception { new RMContainerTokenSecretManager(conf), nmTokenManager, clientToAMTokenManager); - + ((RMContextImpl) rmContext) + .setAMContainerAllocationExpirer(amContainerAllocationExpirer); store = mock(RMStateStore.class); ((RMContextImpl) rmContext).setStateStore(store); publisher = mock(SystemMetricsPublisher.class);