.../api/records/ApplicationSubmissionContext.java | 20 +++ .../apache/hadoop/yarn/conf/YarnConfiguration.java | 5 + .../src/main/proto/yarn_protos.proto | 1 + .../pb/ApplicationSubmissionContextPBImpl.java | 15 ++ .../src/main/resources/yarn-default.xml | 9 + .../resourcemanager/RMActiveServiceContext.java | 11 ++ .../yarn/server/resourcemanager/RMContext.java | 5 + .../yarn/server/resourcemanager/RMContextImpl.java | 11 ++ .../server/resourcemanager/ResourceManager.java | 10 ++ .../server/resourcemanager/rmapp/RMAppImpl.java | 27 +++ .../rmapp/attempt/RMAppLifeTimeMonitorService.java | 186 +++++++++++++++++++++ .../hadoop/yarn/server/resourcemanager/MockRM.java | 30 +++- .../TestApplicationLifeTimeMonitorService.java | 162 ++++++++++++++++++ 13 files changed, 490 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index 21cd1bb..41528fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -535,4 +535,24 @@ public abstract void setLogAggregationContext( @Public @Unstable public abstract void setReservationID(ReservationId reservationID); + + /** + * Get the application lifetime value. + * The application will be killed if is not completed in the given time. + * + * @return application lifetime value. + */ + @Public + @Unstable + public abstract long getApplicationLifeTime(); + + /** + * Set the application lifetime value. + * The application will be killed if is not completed in the given time. + * + * @param applicationLifeTime application lifetime value + */ + @Public + @Unstable + public abstract void setApplicationLifeTime(long applicationLifeTime); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 33e8a1f..a79ed0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2037,6 +2037,11 @@ public static boolean isDistributedNodeLabelConfiguration(Configuration conf) { YARN_PREFIX + "am.blacklisting.disable-failure-threshold"; public static final float DEFAULT_AM_BLACKLISTING_DISABLE_THRESHOLD = 0.8f; + // Configurations for applicaiton life time monitor feature + public static final String RM_LIFE_TIME_MONITOR_INTERVAL_SEC = YARN_PREFIX + + "app.lifetime.monitor.interval-sec"; + + public static final long DEFAULT_RM_APP_LIFE_TIME_MONITOR_INTERVAL_SEC = 60L; public YarnConfiguration() { super(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 057aeee..929409d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -338,6 +338,7 @@ message ApplicationSubmissionContextProto { optional ReservationIdProto reservation_id = 15; optional string node_label_expression = 16; optional ResourceRequestProto am_container_resource_request = 17; + optional int64 app_lifetime_value = 18 [default = -1]; } message LogAggregationContextProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java index 67e3a84..95469f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java @@ -548,4 +548,19 @@ private ReservationIdPBImpl convertFromProtoFormat(ReservationIdProto p) { private ReservationIdProto convertToProtoFormat(ReservationId t) { return ((ReservationIdPBImpl) t).getProto(); } + + @Override + public long getApplicationLifeTime() { + ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasAppLifetimeValue()) { + return -1; + } + return (p.getAppLifetimeValue()); + } + + @Override + public void setApplicationLifeTime(long appLifeTimeValue) { + maybeInitBuilder(); + builder.setAppLifetimeValue(appLifeTimeValue); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index bcd64c3..6420695 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2311,4 +2311,13 @@ yarn.am.blacklisting.disable-failure-threshold 0.8f + + + + The RMAppLifeTimeMonitorService Monitor interval. + + yarn.app.lifetime.monitor.interval-sec + 60 + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index c71323f..2c0c928 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppLifeTimeMonitorService; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -102,6 +103,8 @@ private boolean isSchedulerReady = false; private PlacementManager queuePlacementManager = null; + private RMAppLifeTimeMonitorService rmAppLifeTimeMonitor; + public RMActiveServiceContext() { queuePlacementManager = new PlacementManager(); } @@ -438,4 +441,12 @@ public PlacementManager getQueuePlacementManager() { public void setQueuePlacementManager(PlacementManager placementMgr) { this.queuePlacementManager = placementMgr; } + + public void setRMAppLifeTimeMonitor(RMAppLifeTimeMonitorService rmAppLifeTimeMonitor) { + this.rmAppLifeTimeMonitor = rmAppLifeTimeMonitor; + } + + public RMAppLifeTimeMonitorService getRMAppTimeOutMonitor() { + return this.rmAppLifeTimeMonitor; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index b64c834..453f27f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppLifeTimeMonitorService; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -129,4 +130,8 @@ void setRMApplicationHistoryWriter( PlacementManager getQueuePlacementManager(); void setQueuePlacementManager(PlacementManager placementMgr); + + void setRMAppLifeTimeMonitor(RMAppLifeTimeMonitorService rmAppLifeTimeMonitor); + + RMAppLifeTimeMonitorService getRMAppLifeTimeMonitor(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 840cea7..25e435d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppLifeTimeMonitorService; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -448,4 +449,14 @@ public PlacementManager getQueuePlacementManager() { public void setQueuePlacementManager(PlacementManager placementMgr) { this.activeServiceContext.setQueuePlacementManager(placementMgr); } + + @Override + public void setRMAppLifeTimeMonitor(RMAppLifeTimeMonitorService rmAppLifeTimeMonitor) { + this.activeServiceContext.setRMAppLifeTimeMonitor(rmAppLifeTimeMonitor); + } + + @Override + public RMAppLifeTimeMonitorService getRMAppLifeTimeMonitor() { + return this.activeServiceContext.getRMAppTimeOutMonitor(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index d1f339a..aa227e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppLifeTimeMonitorService; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; @@ -408,6 +409,7 @@ protected static void validateConfigs(Configuration conf) { private ResourceManager rm; private boolean recoveryEnabled; private RMActiveServiceContext activeServiceContext; + protected RMAppLifeTimeMonitorService rmAppLifeTimeMonitor; RMActiveServices(ResourceManager rm) { super("RMActiveServices"); @@ -435,6 +437,10 @@ protected void serviceInit(Configuration configuration) throws Exception { addService(amFinishingMonitor); rmContext.setAMFinishingMonitor(amFinishingMonitor); + rmAppLifeTimeMonitor = createRMAppLifeTimeMonitor(); + addService(rmAppLifeTimeMonitor); + rmContext.setRMAppLifeTimeMonitor(rmAppLifeTimeMonitor); + RMNodeLabelsManager nlm = createNodeLabelManager(); nlm.setRMContext(rmContext); addService(nlm); @@ -1284,4 +1290,8 @@ private static void printUsage(PrintStream out) { out.println(" " + "[-remove-application-from-state-store ]" + "\n"); } + + protected RMAppLifeTimeMonitorService createRMAppLifeTimeMonitor() { + return new RMAppLifeTimeMonitorService(this.rmContext); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 42d889e..d24eb91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -959,6 +959,19 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { } } + // Handle the application lifetime value during recovery + long applicationLifeTime = app.submissionContext.getApplicationLifeTime(); + if (applicationLifeTime > 0) { + app.rmContext.getRMAppLifeTimeMonitor().register( + app.getApplicationId(), + app.submissionContext.getApplicationLifeTime() * 1000); + if (LOG.isDebugEnabled()) { + LOG.debug("Application " + app.getApplicationId() + + " is registered with Application lifetime monitor. " + + "The lifetime configured is " + + app.submissionContext.getApplicationLifeTime() + " seconds"); + } + } // No existent attempts means the attempt associated with this app was not // started or started but not yet saved. if (app.attempts.isEmpty()) { @@ -998,6 +1011,20 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) { app.handler.handle(new AppAddedSchedulerEvent(app.user, app.submissionContext, false)); + + // Set the application lifetime value + long applicationLifeTime = app.submissionContext.getApplicationLifeTime(); + if (applicationLifeTime > 0) { + app.rmContext.getRMAppLifeTimeMonitor().register( + app.getApplicationId(), applicationLifeTime * 1000); + if (LOG.isDebugEnabled()) { + LOG.debug("Application " + + app.getApplicationId() + + " is registered with Application lifetime monitor. " + + "The lifetime configured is " + + app.submissionContext.getApplicationLifeTime() + " seconds"); + } + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppLifeTimeMonitorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppLifeTimeMonitorService.java new file mode 100644 index 0000000..3bdfe37 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppLifeTimeMonitorService.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; + +/** + * This service will monitor the applications against the lifetime value given. + * The applications will be killed if it running beyond the given time. + */ +public class RMAppLifeTimeMonitorService extends AbstractService { + + private static final Log LOG = LogFactory.getLog(RMAppLifeTimeMonitorService.class); + + private RMContext rmContext; + + private Clock clock; + private Thread rmAppLifeTimeMonitorThread = null; + private volatile boolean stopped = false; + + private long monitoringInterval = 0; + + private ConcurrentMap monitoredApps = + new ConcurrentHashMap(); + + public RMAppLifeTimeMonitorService(RMContext rmContext) { + super(RMAppLifeTimeMonitorService.class.getName()); + this.rmContext = rmContext; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + stopped = false; + super.serviceInit(conf); + clock = new SystemClock(); + long applicationTimeOurMonitorInterval = + conf.getLong(YarnConfiguration.RM_LIFE_TIME_MONITOR_INTERVAL_SEC, + YarnConfiguration.DEFAULT_RM_APP_LIFE_TIME_MONITOR_INTERVAL_SEC); + this.monitoringInterval = + (applicationTimeOurMonitorInterval > 0) ? (applicationTimeOurMonitorInterval * 1000) + : (YarnConfiguration.DEFAULT_RM_APP_LIFE_TIME_MONITOR_INTERVAL_SEC * 1000); + LOG.info("Application lifelime monitor interval configured as " + + this.monitoringInterval + " sec"); + } + + @Override + protected void serviceStart() throws Exception { + rmAppLifeTimeMonitorThread = new RMAppLifeTimeMonitorThread(); + rmAppLifeTimeMonitorThread.setName("RMAppLifeTimeMonitorService"); + rmAppLifeTimeMonitorThread.setDaemon(true); + rmAppLifeTimeMonitorThread.start(); + LOG.info("Successfully started RMAppLifeTimeMonitorService"); + } + + private class RMAppLifeTimeMonitorThread extends Thread { + + @SuppressWarnings("unchecked") + @Override + public void run() { + if (LOG.isDebugEnabled()) { + LOG.debug("Starting RMAppLifeTimeMonitorThread."); + } + long applicationlifetime; + RMApp application = null; + while (!stopped && !Thread.currentThread().isInterrupted()) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Entering RMAppLifeTimeMonitorThread.run"); + } + for (Map.Entry e : monitoredApps.entrySet()) { + applicationlifetime = e.getValue(); + application = rmContext.getRMApps().get(e.getKey()); + if (application == null) { + continue; + } + if (!isApplicationInFinalState(application)) { + long currentTime = clock.getTime(); + if (LOG.isDebugEnabled()) { + LOG.debug("Application : " + application.getApplicationId() + + " lifetime : " + applicationlifetime + " " + + " currentTime : " + + currentTime + " SubmitTime : " + + application.getSubmitTime()); + } + if ((currentTime - application.getSubmitTime()) > applicationlifetime) { + rmContext + .getDispatcher() + .getEventHandler() + .handle( + new RMAppEvent(application.getApplicationId(), + RMAppEventType.KILL)); + LOG.warn("Application exceeds the life time configured." + + "Issuing kill command on the application- Application id: " + + application.getApplicationId().toString()); + } + } else { + monitoredApps.remove(application.getApplicationId()); + if (LOG.isDebugEnabled()) { + LOG.debug("Application " + + application.getApplicationId() + + " is removed from the lifetime" + + " monitor since it is completed"); + } + } + } + } catch (YarnRuntimeException runexe) { + LOG.error("YarnRuntimeException exception occured while" + + " killing application based on application lifetime exhausted" + + runexe); + } catch (Exception exe) { + LOG.warn("Exception occurred while killing applicaiton " + + application.getApplicationId(), exe); + } + try { + Thread.sleep(monitoringInterval); + } catch (InterruptedException e1) { + if (LOG.isDebugEnabled()) { + LOG.debug("RMAppLifeTimeMonitorThread got interrupted Exception." + + " Continuing with next iteration."); + } + } + } + } + + private boolean isApplicationInFinalState(RMApp application) { + return application.isAppFinalStateStored() + || application.getState() == RMAppState.FINAL_SAVING + || application.getState() == RMAppState.KILLING; + } + } + + @Override + protected void serviceStop() throws Exception { + stopped = true; + if (rmAppLifeTimeMonitorThread != null) { + rmAppLifeTimeMonitorThread.interrupt(); + try { + rmAppLifeTimeMonitorThread.join(); + } catch (InterruptedException ie) { + LOG.warn("Interrupted Exception while stopping"); + } + } + super.serviceStop(); + } + + public void register(ApplicationId ob, long timeOut) { + monitoredApps.putIfAbsent(ob, timeOut); + } + + public void unregister(ApplicationId ob) { + monitoredApps.remove(ob); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 7ce42f5..593ade8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -441,8 +441,21 @@ public RMApp submitApp(Resource capability, String name, String user, boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, ApplicationId applicationId, long attemptFailuresValidityInterval, LogAggregationContext logAggregationContext, - boolean cancelTokensWhenComplete, Priority priority) - throws Exception { + boolean cancelTokensWhenComplete, Priority priority) throws Exception { + return submitApp(capability, name, user, acls, unmanaged, queue, + maxAppAttempts, ts, appType, waitForAccepted, keepContainers, + isAppIdProvided, applicationId, attemptFailuresValidityInterval, + logAggregationContext, cancelTokensWhenComplete, priority, -1); + } + + public RMApp submitApp(Resource capability, String name, String user, + Map acls, boolean unmanaged, String queue, + int maxAppAttempts, Credentials ts, String appType, + boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, + ApplicationId applicationId, long attemptFailuresValidityInterval, + LogAggregationContext logAggregationContext, + boolean cancelTokensWhenComplete, Priority priority, + long applicationLifeTime) throws Exception { ApplicationId appId = isAppIdProvided ? applicationId : null; ApplicationClientProtocol client = getClientRMService(); if (! isAppIdProvided) { @@ -458,6 +471,9 @@ public RMApp submitApp(Resource capability, String name, String user, sub.setApplicationId(appId); sub.setApplicationName(name); sub.setMaxAppAttempts(maxAppAttempts); + if (-1 != applicationLifeTime) { + sub.setApplicationLifeTime(applicationLifeTime); + } if (unmanaged) { sub.setUnmanagedAM(true); } @@ -810,4 +826,14 @@ public void clearQueueMetrics(RMApp app) { public RMActiveServices getRMActiveService() { return activeServices; } + public RMApp submitApp(int masterMemory, Priority priority, + long applicationLifeTime) throws Exception { + Resource resource = Resource.newInstance(masterMemory, 0); + return submitApp( + resource,"",UserGroupInformation.getCurrentUser().getShortUserName(), + null,false,null, + super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, + false, false, null, 0, null, true, priority, applicationLifeTime); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifeTimeMonitorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifeTimeMonitorService.java new file mode 100644 index 0000000..3dd86c9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifeTimeMonitorService.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Test class for application life time monitor feature test. + */ +public class TestApplicationLifeTimeMonitorService { + private final int GB = 1024; + + private YarnConfiguration conf; + + @Before + public void setup() throws IOException { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + UserGroupInformation.setConfiguration(conf); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, + false); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + } + + @Test(timeout = 60000) + public void testApplicationLifeTimeMonitor() throws Exception { + MockRM rm = null; + try { + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.set(YarnConfiguration.RM_LIFE_TIME_MONITOR_INTERVAL_SEC, "1"); + rm = new MockRM(conf); + rm.start(); + Priority appPriority = Priority.newInstance(0); + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * GB); + RMApp app1 = rm.submitApp(1 * GB, appPriority, 5); + nm1.nodeHeartbeat(true); + // Send launch Event + MockAM am1 = + rm.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId()); + am1.registerAppAttempt(); + rm.waitForState(app1.getApplicationId(), RMAppState.KILLED); + Assert.assertTrue("Applicaiton killed before life timeout value", + (System.currentTimeMillis() - app1.getSubmitTime()) > 5000); + } finally { + stopRM(rm); + } + } + + @Test(timeout = 60000) + public void testAppLifeTimeMonitorAfterRecovery() throws Exception { + MockRM rm1 = null; + MockRM rm2 = null; + try { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); + + conf.set(YarnConfiguration.RM_LIFE_TIME_MONITOR_INTERVAL_SEC, "3"); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + // start RM + rm1 = createMockRM(conf, memStore); + rm1.start(); + // Start NM + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + nm1.nodeHeartbeat(true); + // Submit APP with timeout + Priority appPriority = Priority.newInstance(0); + RMApp app1 = rm1.submitApp(1 * GB, appPriority, 10); + nm1.nodeHeartbeat(true); + // assert app1 attempt is saved + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId(); + rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); + // launch the AM + MockAM am1 = rm1.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + // Start Second RM + rm2 = createMockRM(conf, memStore); + rm2.start(); + // change NM to point to new RM + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + RMApp loadedApp1 = + rm2.getRMContext().getRMApps().get(app1.getApplicationId()); + rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED); + am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); + // new NM to represent NM re-register + nm1 = + new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); + attemptId1 = attempt1.getAppAttemptId(); + rm2.waitForState(attemptId1, RMAppAttemptState.KILLED); + Assert.assertTrue( + "Applicaiton killed before life timeout value. Finish time : " + + loadedApp1.getFinishTime() + " submit Time : " + + loadedApp1.getSubmitTime(), + (System.currentTimeMillis() - loadedApp1.getSubmitTime()) >= 10000); + } finally { + stopRM(rm1); + stopRM(rm2); + } + } + + /** + * + * @return a new MockRM that will be stopped at the end of the test. + */ + private MockRM createMockRM(YarnConfiguration conf, RMStateStore store) { + MockRM rm = new MockRM(conf, store); + return rm; + } + + private void stopRM(MockRM rm) { + if (rm != null) { + rm.stop(); + } + } +}