.../apache/hadoop/yarn/conf/YarnConfiguration.java | 29 ++++-
.../src/main/resources/yarn-default.xml | 16 +++
.../resourcemanager/RMActiveServiceContext.java | 15 +++
.../yarn/server/resourcemanager/RMContext.java | 3 +
.../yarn/server/resourcemanager/RMContextImpl.java | 13 ++
.../server/resourcemanager/ResourceManager.java | 10 ++
.../amcontainer/AMContainerAllocationExpirer.java | 88 ++++++++++++++
.../AMContainerAllocationExpirerInfo.java | 63 ++++++++++
.../AMContainerAllocationExpirerUtil.java | 108 +++++++++++++++++
.../server/resourcemanager/rmapp/RMAppImpl.java | 3 +
.../rmapp/attempt/RMAppAttemptImpl.java | 3 +
.../server/resourcemanager/rmnode/RMNodeImpl.java | 32 +++++
.../TestAMContainerAllocationExpirer.java | 133 +++++++++++++++++++++
.../rmapp/attempt/TestRMAppAttemptTransitions.java | 8 +-
14 files changed, 521 insertions(+), 3 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index a8a87ad8c9c..24eb89c5e66 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -558,7 +558,19 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS =
RM_PREFIX + "rm.container-allocation.expiry-interval-ms";
public static final int DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS = 600000;
-
+
+ /** How long to wait for AM container allocation default 15 minutes */
+ public static final String AM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS = RM_PREFIX
+ + "am.container-allocation.expiry-interval-ms";
+ public static final int DEFAULT_AM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS = 900000;
+
+ /**
+ * AM Container Allocation Expire Enabled or not. By default enabled = false.
+ */
+ public static final String AMCONTAINER_ALLOCATIONEXPIRY_ENABLED = RM_PREFIX
+ + "am.container-allocation.expiry.enabled";
+ public static final boolean DEFAULT_AMCONTAINER_ALLOCATIONEXPIRY_ENABLED = false;
+
/** Path to file with nodes to include.*/
public static final String RM_NODES_INCLUDE_FILE_PATH =
RM_PREFIX + "nodes.include-path";
@@ -4786,4 +4798,19 @@ public static long getSkipNodeInterval(Configuration conf) {
public static void main(String[] args) throws Exception {
new YarnConfiguration(new Configuration()).writeXml(System.out);
}
+
+ /**
+ * Returns true if am allocation expiry is enabled else false
+ * @param conf
+ * @return boolean
+ */
+ public static boolean isAMContainerAllocationExpiryEnabled(
+ Configuration conf) {
+ if (null == conf) {
+ return false;
+ }
+ return conf.getBoolean(
+ YarnConfiguration.AMCONTAINER_ALLOCATIONEXPIRY_ENABLED,
+ YarnConfiguration.DEFAULT_AMCONTAINER_ALLOCATIONEXPIRY_ENABLED);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index f7d9fc1d2b0..2288c5106af 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1139,6 +1139,22 @@
600000
+
+
+ AM container is not allocated for specified time then kill the application
+
+ yarn.resourcemanager.am.container-allocation.expiry.enabled
+ false
+
+
+
+
+ The expiry interval for AM container
+
+ yarn.resourcemanager.am.container-allocation.expiry-interval-ms
+ 900000
+
+
Flag to enable/disable resource profiles
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
index f1b0c794031..c19528b58ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.amcontainer.AMContainerAllocationExpirer;
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
@@ -91,6 +92,7 @@
private AMLivelinessMonitor amFinishingMonitor;
private RMStateStore stateStore = null;
private ContainerAllocationExpirer containerAllocationExpirer;
+ private AMContainerAllocationExpirer amContainerAllocationExpirer;
private DelegationTokenRenewer delegationTokenRenewer;
private AMRMTokenSecretManager amRMTokenSecretManager;
private RMContainerTokenSecretManager containerTokenSecretManager;
@@ -604,4 +606,17 @@ public Long getTokenSequenceNo() {
public void incrTokenSequenceNo() {
this.tokenSequenceNo.incrementAndGet();
}
+
+ @Private
+ @Unstable
+ public AMContainerAllocationExpirer getAMContainerAllocationExpirer() {
+ return this.amContainerAllocationExpirer;
+ }
+
+ @Private
+ @Unstable
+ void setAMContainerAllocationExpirer(
+ AMContainerAllocationExpirer amContainerAllocationExpirer) {
+ this.amContainerAllocationExpirer = amContainerAllocationExpirer;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index 55420bd9270..c7e0aa31f69 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -41,6 +41,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetimeMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+import org.apache.hadoop.yarn.server.resourcemanager.amcontainer.AMContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@@ -203,4 +204,6 @@ void setMultiNodeSortingManager(
long getTokenSequenceNo();
void incrTokenSequenceNo();
+
+ AMContainerAllocationExpirer getAMContainerAllocationExpirer();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 7f10138494e..7675f5a51d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -47,6 +47,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetimeMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+import org.apache.hadoop.yarn.server.resourcemanager.amcontainer.AMContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@@ -678,4 +679,16 @@ public long getTokenSequenceNo() {
public void incrTokenSequenceNo() {
this.activeServiceContext.incrTokenSequenceNo();
}
+
+ @VisibleForTesting
+ public void setAMContainerAllocationExpirer(
+ AMContainerAllocationExpirer amContainerAllocationExpirer) {
+ activeServiceContext
+ .setAMContainerAllocationExpirer(amContainerAllocationExpirer);
+ }
+
+ @Override
+ public AMContainerAllocationExpirer getAMContainerAllocationExpirer() {
+ return activeServiceContext.getAMContainerAllocationExpirer();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index c315b335415..3f34b713082 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -95,6 +95,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetimeMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+import org.apache.hadoop.yarn.server.resourcemanager.amcontainer.AMContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@@ -655,6 +656,7 @@ protected static void validateConfigs(Configuration conf) {
private EventHandler schedulerDispatcher;
private ApplicationMasterLauncher applicationMasterLauncher;
private ContainerAllocationExpirer containerAllocationExpirer;
+ private AMContainerAllocationExpirer amContainerAllocationExpirer;
private ResourceManager rm;
private boolean fromActive = false;
private StandByTransitionRunnable standByTransitionRunnable;
@@ -676,6 +678,14 @@ protected void serviceInit(Configuration configuration) throws Exception {
addService(containerAllocationExpirer);
rmContext.setContainerAllocationExpirer(containerAllocationExpirer);
+ if (YarnConfiguration.isAMContainerAllocationExpiryEnabled(conf)) {
+ amContainerAllocationExpirer = new AMContainerAllocationExpirer(
+ rmContext);
+ addService(amContainerAllocationExpirer);
+ ((RMContextImpl) rmContext)
+ .setAMContainerAllocationExpirer(amContainerAllocationExpirer);
+ }
+
AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
addService(amLivelinessMonitor);
rmContext.setAMLivelinessMonitor(amLivelinessMonitor);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amcontainer/AMContainerAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amcontainer/AMContainerAllocationExpirer.java
new file mode 100644
index 00000000000..28388e8dc56
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amcontainer/AMContainerAllocationExpirer.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.amcontainer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
+
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class AMContainerAllocationExpirer
+ extends AbstractLivelinessMonitor {
+
+ private static final Log LOG = LogFactory
+ .getLog(AMContainerAllocationExpirer.class);
+ private EventHandler dispatcher;
+ private RMContext rmContext;
+ private int expireIntervalInSec;
+
+ public AMContainerAllocationExpirer(RMContext rmContext) {
+ super(AMContainerAllocationExpirer.class.getName());
+ this.dispatcher = rmContext.getDispatcher().getEventHandler();
+ this.rmContext = rmContext;
+ this.expireIntervalInSec = rmContext.getYarnConfiguration().getInt(
+ YarnConfiguration.AM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_AM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS) / 1000;
+ }
+
+ public void serviceInit(Configuration conf) throws Exception {
+ int expireIntvl = conf.getInt(
+ YarnConfiguration.AM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_AM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS);
+ setExpireInterval(expireIntvl);
+ setMonitorInterval(expireIntvl / 3);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void expire(AMContainerAllocationExpirerInfo info) {
+ ApplicationId appId = info.getApplicationId();
+ RMApp app = rmContext.getRMApps().get(appId);
+ if (app == null) {
+ rmContext.getAMContainerAllocationExpirer()
+ .unregister(new AMContainerAllocationExpirerInfo(appId));
+ return;
+ }
+
+ String diagnostics = "Application is killed by ResourceManager as AM container is not"
+ + " allocated resources in the specified time out period "
+ + expireIntervalInSec + ".";
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Application " + appId
+ + " is going to kill by ResourceManager as AM container is not"
+ + " allocated resources in the specified time out period"
+ + expireIntervalInSec + ".");
+ }
+
+ // Kill the app only if AM container still not allocated
+ if (null == app.getCurrentAppAttempt().getMasterContainer()) {
+ dispatcher
+ .handle(new RMAppEvent(appId, RMAppEventType.KILL, diagnostics));
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amcontainer/AMContainerAllocationExpirerInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amcontainer/AMContainerAllocationExpirerInfo.java
new file mode 100644
index 00000000000..e8bf0614d6d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amcontainer/AMContainerAllocationExpirerInfo.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.amcontainer;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+public class AMContainerAllocationExpirerInfo
+ implements Comparable {
+
+ private final ApplicationId applicationId;
+
+ public AMContainerAllocationExpirerInfo(ApplicationId applicationId) {
+ this.applicationId = applicationId;
+ }
+
+ public ApplicationId getApplicationId() {
+ return this.applicationId;
+ }
+
+ @Override
+ public int hashCode() {
+ return (this.applicationId.hashCode() << 16);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof AMContainerAllocationExpirerInfo)) {
+ return false;
+ }
+ return compareTo((AMContainerAllocationExpirerInfo) other) == 0;
+ }
+
+ @Override
+ public int compareTo(AMContainerAllocationExpirerInfo other) {
+ if (other == null) {
+ return -1;
+ }
+ // Only need to compare applicationId.
+ return applicationId.compareTo(other.getApplicationId());
+ }
+
+ @Override
+ public String toString() {
+ return "";
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amcontainer/AMContainerAllocationExpirerUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amcontainer/AMContainerAllocationExpirerUtil.java
new file mode 100644
index 00000000000..abaf6b96369
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amcontainer/AMContainerAllocationExpirerUtil.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.amcontainer;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+
+/**
+ * Utility class to register and unregister app with
+ * AMContainerAllocationExpirer.
+ */
+public final class AMContainerAllocationExpirerUtil {
+ private static final Log LOG = LogFactory
+ .getLog(AMContainerAllocationExpirerUtil.class);
+ private static final AMContainerAllocationExpirerUtil NEW_INSTANCE = new AMContainerAllocationExpirerUtil();
+
+ private AMContainerAllocationExpirerUtil() {
+
+ }
+
+ public static AMContainerAllocationExpirerUtil getInstance() {
+ return NEW_INSTANCE;
+ }
+
+ /**
+ * Register running apps with AMContainerAllocationExpirer.
+ *
+ * @param rmNode
+ */
+ public void registerWithAMContainerAllocationExpirer(RMContext context,
+ List runningAppsList) {
+ if (!runningAppsList.isEmpty()
+ && (YarnConfiguration.isAMContainerAllocationExpiryEnabled(
+ context.getYarnConfiguration()))) {
+ for (ApplicationId appId : runningAppsList) {
+ context.getAMContainerAllocationExpirer()
+ .register(new AMContainerAllocationExpirerInfo(appId));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(appId + " regisetered with "
+ + "AMContainerAllocationExpirer because of node unhealthy");
+ }
+ }
+ }
+ }
+
+ /**
+ * Unregister app with AMContainerAllocationExpirer, when app is killed.
+ *
+ * @param rmContext
+ * @param app
+ */
+ public void unregister(RMContext rmContext, RMAppImpl app) {
+ if (RMAppState.KILLED.equals(app.getState())
+ && YarnConfiguration.isAMContainerAllocationExpiryEnabled(
+ rmContext.getYarnConfiguration())) {
+ rmContext.getAMContainerAllocationExpirer().unregister(
+ new AMContainerAllocationExpirerInfo(app.getApplicationId()));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(app.getApplicationId()
+ + " unregistered with AMContainerAllocationExpirer");
+ }
+ }
+ }
+
+ /**
+ * Unregistering app from AMContainerAllocationExpirer, after AM container is
+ * allocated successfully.
+ *
+ * @param rmContext
+ * @param appAttempt
+ */
+ public void unregister(RMContext rmContext, RMAppAttemptImpl appAttempt) {
+ if (YarnConfiguration.isAMContainerAllocationExpiryEnabled(
+ rmContext.getYarnConfiguration())) {
+ rmContext.getAMContainerAllocationExpirer()
+ .unregister(new AMContainerAllocationExpirerInfo(
+ appAttempt.getAppAttemptId().getApplicationId()));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(appAttempt.getAppAttemptId()
+ + " unregistered with AMContainerAllocationExpirer");
+ }
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 94f7bb97bfe..ff97bf72afc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -77,6 +77,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.amcontainer.AMContainerAllocationExpirerUtil;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager;
@@ -1339,6 +1340,8 @@ public FinalSavingTransition(Object transitionToDo,
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
+ AMContainerAllocationExpirerUtil.getInstance().unregister(app.rmContext,
+ app);
app.rememberTargetTransitionsAndStoreState(event, transitionToDo,
targetedFinalState, stateToBeStored);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 2d6de3750e9..ae9e6b6af6f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -72,6 +72,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.amcontainer.AMContainerAllocationExpirerUtil;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
@@ -1232,6 +1233,8 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
ClusterMetrics.getMetrics().addAMContainerAllocationDelay(
allocationDelay);
appAttempt.storeAttempt();
+ AMContainerAllocationExpirerUtil.getInstance()
+ .unregister(appAttempt.rmContext, appAttempt);
return RMAppAttemptState.ALLOCATED_SAVING;
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index cec9915e0d1..a7c9a7573a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -74,6 +74,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.amcontainer.AMContainerAllocationExpirerUtil;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
@@ -1365,6 +1366,18 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
LOG.info("Node " + rmNode.nodeId +
" reported UNHEALTHY with details: " +
remoteNodeHealthStatus.getHealthReport());
+ List runningApps = rmNode.getRunningApps();
+ Iterator iterator = runningApps.iterator();
+ while (iterator.hasNext()) {
+ ApplicationId appId = iterator.next();
+ if (!isAMLaunchedInSameRMNode(rmNode, appId)) {
+ iterator.remove();
+ }
+ }
+ AMContainerAllocationExpirerUtil.getInstance()
+ .registerWithAMContainerAllocationExpirer(rmNode.context,
+ runningApps);
+
// if a node in decommissioning receives an unhealthy report,
// it will stay in decommissioning.
if (isNodeDecommissioning) {
@@ -1401,6 +1414,25 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
return initialState;
}
+
+ /**
+ * Return true if AM launched in the same rmNode Else return false
+ *
+ * @param rmNode
+ * @param appId
+ * @return
+ */
+ private boolean isAMLaunchedInSameRMNode(RMNodeImpl rmNode,
+ ApplicationId appId) {
+ RMApp rmApp = rmNode.context.getRMApps().get(appId);
+ if (null != rmApp
+ && null != rmApp.getCurrentAppAttempt().getMasterContainer()
+ && rmNode.getNodeID().equals(
+ rmApp.getCurrentAppAttempt().getMasterContainer().getNodeId())) {
+ return true;
+ }
+ return false;
+ }
}
public static class StatusUpdateWhenUnHealthyTransition implements
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/amcontainer/TestAMContainerAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/amcontainer/TestAMContainerAllocationExpirer.java
new file mode 100644
index 00000000000..0126adaa3dc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/amcontainer/TestAMContainerAllocationExpirer.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.amcontainer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test class for AM container allocation expire. Scenario is: when APP is
+ * running normally, node will become unhealthy. So AM container exit and RM try
+ * to launch AM for second attempt. In case, AM container (Only for previously
+ * running apps in the unhealthy node) for second attempt is not allocated for
+ * the specified timeout, then need to kill app.
+ */
+public class TestAMContainerAllocationExpirer {
+ private final int GB = 1024;
+ private YarnConfiguration conf;
+ RMNodeLabelsManager mgr;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ mgr = new NullRMNodeLabelsManager();
+ mgr.init(conf);
+ }
+
+ @Test(timeout = 90000)
+ public void testAppKilledWhenTimeoutExpire() throws Exception {
+ // This is to test register flow
+ testAppKilledWhenAMContainerIsNotAllocated(false);
+ }
+
+ @Test(timeout = 900000)
+ public void testAppKilledBeforeTimeoutExpire() throws Exception {
+ // This is to test unregister flow by killing app before time out
+ testAppKilledWhenAMContainerIsNotAllocated(true);
+ }
+
+ private void testAppKilledWhenAMContainerIsNotAllocated(
+ boolean isKillAppForcefully)
+ throws Exception, InterruptedException, IOException {
+ // Enable AM Container Allocation expire feature
+ // Set AM container allocation expire interval to 15 seconds
+ conf.setBoolean(YarnConfiguration.AMCONTAINER_ALLOCATIONEXPIRY_ENABLED,
+ true);
+ conf.setLong(YarnConfiguration.AM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS,
+ 15000);
+
+ // Start mock RM
+ MockRM rm = new MockRM(conf);
+ rm.start();
+
+ // Submit an application
+ MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder
+ .createWithMemory(1024, rm).withAppName("app").withUser("user")
+ .withAcls(null).withQueue("default").build();
+ RMApp app = MockRMAppSubmitter.submit(rm, data);
+
+ // Register new NM with RM
+ MockNM nm = rm.registerNode("127.0.0.1:1234", 4 * GB);
+
+ // Launch AM on NM for the submitted app
+ MockAM am = MockRM.launchAndRegisterAM(app, rm, nm);
+
+ // AM request a new container
+ am.allocate("127.0.0.1", 1 * GB, 1, new ArrayList());
+ ContainerId containerId = ContainerId
+ .newContainerId(am.getApplicationAttemptId(), 2);
+ rm.waitForState(nm, containerId, RMContainerState.ALLOCATED);
+
+ // Report container status RUNNING
+ nm.nodeHeartbeat(app.getCurrentAppAttempt().getAppAttemptId(), 2,
+ ContainerState.RUNNING);
+
+ // Now stop NM heart beats to report NM as unhealthy
+ nm.nodeHeartbeat(false);
+
+ // If isKillAppForcefully = true kill app before timeout expire
+ if (isKillAppForcefully) {
+ ((AbstractYarnScheduler) rm.getResourceScheduler())
+ .killAllAppsInQueue("default");
+ }
+
+ // Wait for application to be killed by AMContainerAllocationExpirer
+ rm.waitForState(app.getApplicationId(), RMAppState.KILLED);
+
+ // Final application status must be FinalApplicationStatus.KILLED
+ Assert.assertEquals(FinalApplicationStatus.KILLED,
+ app.getFinalApplicationStatus());
+
+ // Close/Stop rm
+ rm.close();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
index 4e5ff3f7687..eff69392689 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
@@ -75,6 +75,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.amcontainer.AMContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
@@ -245,7 +246,9 @@ public void setUp() throws Exception {
SecurityUtil.setAuthenticationMethod(authMethod, conf);
UserGroupInformation.setConfiguration(conf);
InlineDispatcher rmDispatcher = new InlineDispatcher();
-
+
+ AMContainerAllocationExpirer amContainerAllocationExpirer = mock(
+ AMContainerAllocationExpirer.class);
ContainerAllocationExpirer containerAllocationExpirer =
mock(ContainerAllocationExpirer.class);
amLivelinessMonitor = mock(AMLivelinessMonitor.class);
@@ -260,7 +263,8 @@ public void setUp() throws Exception {
new RMContainerTokenSecretManager(conf),
nmTokenManager,
clientToAMTokenManager);
-
+ ((RMContextImpl) rmContext)
+ .setAMContainerAllocationExpirer(amContainerAllocationExpirer);
store = mock(RMStateStore.class);
((RMContextImpl) rmContext).setStateStore(store);
publisher = mock(SystemMetricsPublisher.class);