.../api/records/ApplicationSubmissionContext.java | 20 +++
.../apache/hadoop/yarn/conf/YarnConfiguration.java | 5 +
.../src/main/proto/yarn_protos.proto | 1 +
.../pb/ApplicationSubmissionContextPBImpl.java | 15 ++
.../src/main/resources/yarn-default.xml | 9 +
.../resourcemanager/RMActiveServiceContext.java | 11 ++
.../yarn/server/resourcemanager/RMContext.java | 5 +
.../yarn/server/resourcemanager/RMContextImpl.java | 11 ++
.../server/resourcemanager/ResourceManager.java | 10 ++
.../server/resourcemanager/rmapp/RMAppImpl.java | 27 +++
.../rmapp/attempt/RMAppLifeTimeMonitorService.java | 186 +++++++++++++++++++++
.../hadoop/yarn/server/resourcemanager/MockRM.java | 30 +++-
.../TestApplicationLifeTimeMonitorService.java | 162 ++++++++++++++++++
13 files changed, 490 insertions(+), 2 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
index 21cd1bb..41528fd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
@@ -535,4 +535,24 @@ public abstract void setLogAggregationContext(
@Public
@Unstable
public abstract void setReservationID(ReservationId reservationID);
+
+ /**
+ * Get the application lifetime value.
+ * The application will be killed if is not completed in the given time.
+ *
+ * @return application lifetime value.
+ */
+ @Public
+ @Unstable
+ public abstract long getApplicationLifeTime();
+
+ /**
+ * Set the application lifetime value.
+ * The application will be killed if is not completed in the given time.
+ *
+ * @param applicationLifeTime application lifetime value
+ */
+ @Public
+ @Unstable
+ public abstract void setApplicationLifeTime(long applicationLifeTime);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 33e8a1f..a79ed0b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2037,6 +2037,11 @@ public static boolean isDistributedNodeLabelConfiguration(Configuration conf) {
YARN_PREFIX + "am.blacklisting.disable-failure-threshold";
public static final float DEFAULT_AM_BLACKLISTING_DISABLE_THRESHOLD = 0.8f;
+ // Configurations for applicaiton life time monitor feature
+ public static final String RM_LIFE_TIME_MONITOR_INTERVAL_SEC = YARN_PREFIX
+ + "app.lifetime.monitor.interval-sec";
+
+ public static final long DEFAULT_RM_APP_LIFE_TIME_MONITOR_INTERVAL_SEC = 60L;
public YarnConfiguration() {
super();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 057aeee..929409d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -338,6 +338,7 @@ message ApplicationSubmissionContextProto {
optional ReservationIdProto reservation_id = 15;
optional string node_label_expression = 16;
optional ResourceRequestProto am_container_resource_request = 17;
+ optional int64 app_lifetime_value = 18 [default = -1];
}
message LogAggregationContextProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
index 67e3a84..95469f2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
@@ -548,4 +548,19 @@ private ReservationIdPBImpl convertFromProtoFormat(ReservationIdProto p) {
private ReservationIdProto convertToProtoFormat(ReservationId t) {
return ((ReservationIdPBImpl) t).getProto();
}
+
+ @Override
+ public long getApplicationLifeTime() {
+ ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasAppLifetimeValue()) {
+ return -1;
+ }
+ return (p.getAppLifetimeValue());
+ }
+
+ @Override
+ public void setApplicationLifeTime(long appLifeTimeValue) {
+ maybeInitBuilder();
+ builder.setAppLifetimeValue(appLifeTimeValue);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index bcd64c3..6420695 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2311,4 +2311,13 @@
yarn.am.blacklisting.disable-failure-threshold
0.8f
+
+
+
+ The RMAppLifeTimeMonitorService Monitor interval.
+
+ yarn.app.lifetime.monitor.interval-sec
+ 60
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
index c71323f..2c0c928 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -39,6 +39,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppLifeTimeMonitorService;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -102,6 +103,8 @@
private boolean isSchedulerReady = false;
private PlacementManager queuePlacementManager = null;
+ private RMAppLifeTimeMonitorService rmAppLifeTimeMonitor;
+
public RMActiveServiceContext() {
queuePlacementManager = new PlacementManager();
}
@@ -438,4 +441,12 @@ public PlacementManager getQueuePlacementManager() {
public void setQueuePlacementManager(PlacementManager placementMgr) {
this.queuePlacementManager = placementMgr;
}
+
+ public void setRMAppLifeTimeMonitor(RMAppLifeTimeMonitorService rmAppLifeTimeMonitor) {
+ this.rmAppLifeTimeMonitor = rmAppLifeTimeMonitor;
+ }
+
+ public RMAppLifeTimeMonitorService getRMAppTimeOutMonitor() {
+ return this.rmAppLifeTimeMonitor;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index b64c834..453f27f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppLifeTimeMonitorService;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -129,4 +130,8 @@ void setRMApplicationHistoryWriter(
PlacementManager getQueuePlacementManager();
void setQueuePlacementManager(PlacementManager placementMgr);
+
+ void setRMAppLifeTimeMonitor(RMAppLifeTimeMonitorService rmAppLifeTimeMonitor);
+
+ RMAppLifeTimeMonitorService getRMAppLifeTimeMonitor();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 840cea7..25e435d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -39,6 +39,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppLifeTimeMonitorService;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -448,4 +449,14 @@ public PlacementManager getQueuePlacementManager() {
public void setQueuePlacementManager(PlacementManager placementMgr) {
this.activeServiceContext.setQueuePlacementManager(placementMgr);
}
+
+ @Override
+ public void setRMAppLifeTimeMonitor(RMAppLifeTimeMonitorService rmAppLifeTimeMonitor) {
+ this.activeServiceContext.setRMAppLifeTimeMonitor(rmAppLifeTimeMonitor);
+ }
+
+ @Override
+ public RMAppLifeTimeMonitorService getRMAppLifeTimeMonitor() {
+ return this.activeServiceContext.getRMAppTimeOutMonitor();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index d1f339a..aa227e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -76,6 +76,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppLifeTimeMonitorService;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
@@ -408,6 +409,7 @@ protected static void validateConfigs(Configuration conf) {
private ResourceManager rm;
private boolean recoveryEnabled;
private RMActiveServiceContext activeServiceContext;
+ protected RMAppLifeTimeMonitorService rmAppLifeTimeMonitor;
RMActiveServices(ResourceManager rm) {
super("RMActiveServices");
@@ -435,6 +437,10 @@ protected void serviceInit(Configuration configuration) throws Exception {
addService(amFinishingMonitor);
rmContext.setAMFinishingMonitor(amFinishingMonitor);
+ rmAppLifeTimeMonitor = createRMAppLifeTimeMonitor();
+ addService(rmAppLifeTimeMonitor);
+ rmContext.setRMAppLifeTimeMonitor(rmAppLifeTimeMonitor);
+
RMNodeLabelsManager nlm = createNodeLabelManager();
nlm.setRMContext(rmContext);
addService(nlm);
@@ -1284,4 +1290,8 @@ private static void printUsage(PrintStream out) {
out.println(" "
+ "[-remove-application-from-state-store ]" + "\n");
}
+
+ protected RMAppLifeTimeMonitorService createRMAppLifeTimeMonitor() {
+ return new RMAppLifeTimeMonitorService(this.rmContext);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 42d889e..d24eb91 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -959,6 +959,19 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
}
}
+ // Handle the application lifetime value during recovery
+ long applicationLifeTime = app.submissionContext.getApplicationLifeTime();
+ if (applicationLifeTime > 0) {
+ app.rmContext.getRMAppLifeTimeMonitor().register(
+ app.getApplicationId(),
+ app.submissionContext.getApplicationLifeTime() * 1000);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Application " + app.getApplicationId()
+ + " is registered with Application lifetime monitor. "
+ + "The lifetime configured is "
+ + app.submissionContext.getApplicationLifeTime() + " seconds");
+ }
+ }
// No existent attempts means the attempt associated with this app was not
// started or started but not yet saved.
if (app.attempts.isEmpty()) {
@@ -998,6 +1011,20 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
public void transition(RMAppImpl app, RMAppEvent event) {
app.handler.handle(new AppAddedSchedulerEvent(app.user,
app.submissionContext, false));
+
+ // Set the application lifetime value
+ long applicationLifeTime = app.submissionContext.getApplicationLifeTime();
+ if (applicationLifeTime > 0) {
+ app.rmContext.getRMAppLifeTimeMonitor().register(
+ app.getApplicationId(), applicationLifeTime * 1000);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Application "
+ + app.getApplicationId()
+ + " is registered with Application lifetime monitor. "
+ + "The lifetime configured is "
+ + app.submissionContext.getApplicationLifeTime() + " seconds");
+ }
+ }
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppLifeTimeMonitorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppLifeTimeMonitorService.java
new file mode 100644
index 0000000..3bdfe37
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppLifeTimeMonitorService.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
+
+/**
+ * This service will monitor the applications against the lifetime value given.
+ * The applications will be killed if it running beyond the given time.
+ */
+public class RMAppLifeTimeMonitorService extends AbstractService {
+
+ private static final Log LOG = LogFactory.getLog(RMAppLifeTimeMonitorService.class);
+
+ private RMContext rmContext;
+
+ private Clock clock;
+ private Thread rmAppLifeTimeMonitorThread = null;
+ private volatile boolean stopped = false;
+
+ private long monitoringInterval = 0;
+
+ private ConcurrentMap monitoredApps =
+ new ConcurrentHashMap();
+
+ public RMAppLifeTimeMonitorService(RMContext rmContext) {
+ super(RMAppLifeTimeMonitorService.class.getName());
+ this.rmContext = rmContext;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ stopped = false;
+ super.serviceInit(conf);
+ clock = new SystemClock();
+ long applicationTimeOurMonitorInterval =
+ conf.getLong(YarnConfiguration.RM_LIFE_TIME_MONITOR_INTERVAL_SEC,
+ YarnConfiguration.DEFAULT_RM_APP_LIFE_TIME_MONITOR_INTERVAL_SEC);
+ this.monitoringInterval =
+ (applicationTimeOurMonitorInterval > 0) ? (applicationTimeOurMonitorInterval * 1000)
+ : (YarnConfiguration.DEFAULT_RM_APP_LIFE_TIME_MONITOR_INTERVAL_SEC * 1000);
+ LOG.info("Application lifelime monitor interval configured as "
+ + this.monitoringInterval + " sec");
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ rmAppLifeTimeMonitorThread = new RMAppLifeTimeMonitorThread();
+ rmAppLifeTimeMonitorThread.setName("RMAppLifeTimeMonitorService");
+ rmAppLifeTimeMonitorThread.setDaemon(true);
+ rmAppLifeTimeMonitorThread.start();
+ LOG.info("Successfully started RMAppLifeTimeMonitorService");
+ }
+
+ private class RMAppLifeTimeMonitorThread extends Thread {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void run() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Starting RMAppLifeTimeMonitorThread.");
+ }
+ long applicationlifetime;
+ RMApp application = null;
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Entering RMAppLifeTimeMonitorThread.run");
+ }
+ for (Map.Entry e : monitoredApps.entrySet()) {
+ applicationlifetime = e.getValue();
+ application = rmContext.getRMApps().get(e.getKey());
+ if (application == null) {
+ continue;
+ }
+ if (!isApplicationInFinalState(application)) {
+ long currentTime = clock.getTime();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Application : " + application.getApplicationId()
+ + " lifetime : " + applicationlifetime + " "
+ + " currentTime : "
+ + currentTime + " SubmitTime : "
+ + application.getSubmitTime());
+ }
+ if ((currentTime - application.getSubmitTime()) > applicationlifetime) {
+ rmContext
+ .getDispatcher()
+ .getEventHandler()
+ .handle(
+ new RMAppEvent(application.getApplicationId(),
+ RMAppEventType.KILL));
+ LOG.warn("Application exceeds the life time configured."
+ + "Issuing kill command on the application- Application id: "
+ + application.getApplicationId().toString());
+ }
+ } else {
+ monitoredApps.remove(application.getApplicationId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Application "
+ + application.getApplicationId()
+ + " is removed from the lifetime"
+ + " monitor since it is completed");
+ }
+ }
+ }
+ } catch (YarnRuntimeException runexe) {
+ LOG.error("YarnRuntimeException exception occured while"
+ + " killing application based on application lifetime exhausted"
+ + runexe);
+ } catch (Exception exe) {
+ LOG.warn("Exception occurred while killing applicaiton "
+ + application.getApplicationId(), exe);
+ }
+ try {
+ Thread.sleep(monitoringInterval);
+ } catch (InterruptedException e1) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RMAppLifeTimeMonitorThread got interrupted Exception."
+ + " Continuing with next iteration.");
+ }
+ }
+ }
+ }
+
+ private boolean isApplicationInFinalState(RMApp application) {
+ return application.isAppFinalStateStored()
+ || application.getState() == RMAppState.FINAL_SAVING
+ || application.getState() == RMAppState.KILLING;
+ }
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ stopped = true;
+ if (rmAppLifeTimeMonitorThread != null) {
+ rmAppLifeTimeMonitorThread.interrupt();
+ try {
+ rmAppLifeTimeMonitorThread.join();
+ } catch (InterruptedException ie) {
+ LOG.warn("Interrupted Exception while stopping");
+ }
+ }
+ super.serviceStop();
+ }
+
+ public void register(ApplicationId ob, long timeOut) {
+ monitoredApps.putIfAbsent(ob, timeOut);
+ }
+
+ public void unregister(ApplicationId ob) {
+ monitoredApps.remove(ob);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 7ce42f5..593ade8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -441,8 +441,21 @@ public RMApp submitApp(Resource capability, String name, String user,
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
ApplicationId applicationId, long attemptFailuresValidityInterval,
LogAggregationContext logAggregationContext,
- boolean cancelTokensWhenComplete, Priority priority)
- throws Exception {
+ boolean cancelTokensWhenComplete, Priority priority) throws Exception {
+ return submitApp(capability, name, user, acls, unmanaged, queue,
+ maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
+ isAppIdProvided, applicationId, attemptFailuresValidityInterval,
+ logAggregationContext, cancelTokensWhenComplete, priority, -1);
+ }
+
+ public RMApp submitApp(Resource capability, String name, String user,
+ Map acls, boolean unmanaged, String queue,
+ int maxAppAttempts, Credentials ts, String appType,
+ boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
+ ApplicationId applicationId, long attemptFailuresValidityInterval,
+ LogAggregationContext logAggregationContext,
+ boolean cancelTokensWhenComplete, Priority priority,
+ long applicationLifeTime) throws Exception {
ApplicationId appId = isAppIdProvided ? applicationId : null;
ApplicationClientProtocol client = getClientRMService();
if (! isAppIdProvided) {
@@ -458,6 +471,9 @@ public RMApp submitApp(Resource capability, String name, String user,
sub.setApplicationId(appId);
sub.setApplicationName(name);
sub.setMaxAppAttempts(maxAppAttempts);
+ if (-1 != applicationLifeTime) {
+ sub.setApplicationLifeTime(applicationLifeTime);
+ }
if (unmanaged) {
sub.setUnmanagedAM(true);
}
@@ -810,4 +826,14 @@ public void clearQueueMetrics(RMApp app) {
public RMActiveServices getRMActiveService() {
return activeServices;
}
+ public RMApp submitApp(int masterMemory, Priority priority,
+ long applicationLifeTime) throws Exception {
+ Resource resource = Resource.newInstance(masterMemory, 0);
+ return submitApp(
+ resource,"",UserGroupInformation.getCurrentUser().getShortUserName(),
+ null,false,null,
+ super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true,
+ false, false, null, 0, null, true, priority, applicationLifeTime);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifeTimeMonitorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifeTimeMonitorService.java
new file mode 100644
index 0000000..3dd86c9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifeTimeMonitorService.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test class for application life time monitor feature test.
+ */
+public class TestApplicationLifeTimeMonitorService {
+ private final int GB = 1024;
+
+ private YarnConfiguration conf;
+
+ @Before
+ public void setup() throws IOException {
+ conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.DEBUG);
+ UserGroupInformation.setConfiguration(conf);
+ conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
+ false);
+ conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+ }
+
+ @Test(timeout = 60000)
+ public void testApplicationLifeTimeMonitor() throws Exception {
+ MockRM rm = null;
+ try {
+ Configuration conf = new Configuration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.set(YarnConfiguration.RM_LIFE_TIME_MONITOR_INTERVAL_SEC, "1");
+ rm = new MockRM(conf);
+ rm.start();
+ Priority appPriority = Priority.newInstance(0);
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * GB);
+ RMApp app1 = rm.submitApp(1 * GB, appPriority, 5);
+ nm1.nodeHeartbeat(true);
+ // Send launch Event
+ MockAM am1 =
+ rm.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId());
+ am1.registerAppAttempt();
+ rm.waitForState(app1.getApplicationId(), RMAppState.KILLED);
+ Assert.assertTrue("Applicaiton killed before life timeout value",
+ (System.currentTimeMillis() - app1.getSubmitTime()) > 5000);
+ } finally {
+ stopRM(rm);
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testAppLifeTimeMonitorAfterRecovery() throws Exception {
+ MockRM rm1 = null;
+ MockRM rm2 = null;
+ try {
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
+
+ conf.set(YarnConfiguration.RM_LIFE_TIME_MONITOR_INTERVAL_SEC, "3");
+
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ // start RM
+ rm1 = createMockRM(conf, memStore);
+ rm1.start();
+ // Start NM
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ nm1.nodeHeartbeat(true);
+ // Submit APP with timeout
+ Priority appPriority = Priority.newInstance(0);
+ RMApp app1 = rm1.submitApp(1 * GB, appPriority, 10);
+ nm1.nodeHeartbeat(true);
+ // assert app1 attempt is saved
+ RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+ ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId();
+ rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
+ // launch the AM
+ MockAM am1 = rm1.sendAMLaunched(attempt1.getAppAttemptId());
+ am1.registerAppAttempt();
+ // Start Second RM
+ rm2 = createMockRM(conf, memStore);
+ rm2.start();
+ // change NM to point to new RM
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ RMApp loadedApp1 =
+ rm2.getRMContext().getRMApps().get(app1.getApplicationId());
+ rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
+ am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
+ // new NM to represent NM re-register
+ nm1 =
+ new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
+ attemptId1 = attempt1.getAppAttemptId();
+ rm2.waitForState(attemptId1, RMAppAttemptState.KILLED);
+ Assert.assertTrue(
+ "Applicaiton killed before life timeout value. Finish time : "
+ + loadedApp1.getFinishTime() + " submit Time : "
+ + loadedApp1.getSubmitTime(),
+ (System.currentTimeMillis() - loadedApp1.getSubmitTime()) >= 10000);
+ } finally {
+ stopRM(rm1);
+ stopRM(rm2);
+ }
+ }
+
+ /**
+ *
+ * @return a new MockRM that will be stopped at the end of the test.
+ */
+ private MockRM createMockRM(YarnConfiguration conf, RMStateStore store) {
+ MockRM rm = new MockRM(conf, store);
+ return rm;
+ }
+
+ private void stopRM(MockRM rm) {
+ if (rm != null) {
+ rm.stop();
+ }
+ }
+}