From 2d84325c07ea7fb72d1a8fc121a249e1a8a778ed Mon Sep 17 00:00:00 2001 From: Rohith Sharma K S Date: Sun, 4 Sep 2016 23:47:07 +0530 Subject: [PATCH] YARN-4205.patch --- .../api/records/ApplicationSubmissionContext.java | 21 +++ .../apache/hadoop/yarn/conf/YarnConfiguration.java | 6 + .../src/main/proto/yarn_protos.proto | 1 + .../pb/ApplicationSubmissionContextPBImpl.java | 15 ++ .../yarn/util/AbstractLivelinessMonitor.java | 28 ++-- .../src/main/resources/yarn-default.xml | 9 ++ .../resourcemanager/RMActiveServiceContext.java | 16 +++ .../yarn/server/resourcemanager/RMContext.java | 5 + .../yarn/server/resourcemanager/RMContextImpl.java | 12 ++ .../server/resourcemanager/ResourceManager.java | 9 ++ .../server/resourcemanager/rmapp/RMAppImpl.java | 24 ++++ .../rmapp/RMAppLifetimeMonitor.java | 114 +++++++++++++++ .../hadoop/yarn/server/resourcemanager/MockRM.java | 20 ++- .../rmapp/TestApplicationLifetimeMonitor.java | 160 +++++++++++++++++++++ 14 files changed, 428 insertions(+), 12 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLifetimeMonitor.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index 21cd1bb..271efa6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -535,4 +535,25 @@ public abstract void setLogAggregationContext( @Public @Unstable public abstract void setReservationID(ReservationId reservationID); + + /** + * Get the application lifetime value. + * The application will be killed if is not completed in the given time. + * + * @return application lifetime value. + */ + @Public + @Unstable + public abstract long getApplicationLifetime(); + + /** + * Set the application lifetime value. The application will be killed if is + * not completed in the given time.
+ * Note : Do not configure very less value. + * + * @param applicationLifetime application lifetime value in seconds + */ + @Public + @Unstable + public abstract void setApplicationLifetime(long applicationLifetime); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 46e3323..3789a4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1529,6 +1529,12 @@ public static boolean isAclEnabled(Configuration conf) { false; + // Configurations for applicaiton life time monitor feature + public static final String RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS = + RM_PREFIX + "application.lifetimeout-monitor.interval-ms"; + + public static final int DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS = + 60000; /** * Interval of time the linux container executor should try cleaning up diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 2d6007e..e41e220 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -365,6 +365,7 @@ message ApplicationSubmissionContextProto { optional ReservationIdProto reservation_id = 15; optional string node_label_expression = 16; optional ResourceRequestProto am_container_resource_request = 17; + optional int64 application_lifetime = 18 [default = -1]; } message LogAggregationContextProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java index 67e3a84..fbc677a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java @@ -548,4 +548,19 @@ private ReservationIdPBImpl convertFromProtoFormat(ReservationIdProto p) { private ReservationIdProto convertToProtoFormat(ReservationId t) { return ((ReservationIdPBImpl) t).getProto(); } + + @Override + public long getApplicationLifetime() { + ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasApplicationLifetime()) { + return -1; + } + return (p.getApplicationLifetime()); + } + + @Override + public void setApplicationLifetime(long applicationLifetime) { + maybeInitBuilder(); + builder.setApplicationLifetime(applicationLifetime); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java index e80d032..edbc36c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java @@ -44,8 +44,8 @@ private Thread checkerThread; private volatile boolean stopped; public static final int DEFAULT_EXPIRE = 5*60*1000;//5 mins - private int expireInterval = DEFAULT_EXPIRE; - private int monitorInterval = expireInterval/3; + private long expireInterval = DEFAULT_EXPIRE; + private long monitorInterval = expireInterval / 3; private final Clock clock; @@ -85,6 +85,11 @@ protected void setExpireInterval(int expireInterval) { this.expireInterval = expireInterval; } + protected long getExpireInterval(O o) { + // by-default return for all the registered object interval. + return this.expireInterval; + } + protected void setMonitorInterval(int monitorInterval) { this.monitorInterval = monitorInterval; } @@ -97,7 +102,11 @@ public synchronized void receivedPing(O ob) { } public synchronized void register(O ob) { - running.put(ob, clock.getTime()); + register(ob, clock.getTime()); + } + + public synchronized void register(O ob, long timeout) { + running.put(ob, timeout); } public synchronized void unregister(O ob) { @@ -117,19 +126,20 @@ public synchronized void resetTimer() { public void run() { while (!stopped && !Thread.currentThread().isInterrupted()) { synchronized (AbstractLivelinessMonitor.this) { - Iterator> iterator = - running.entrySet().iterator(); + Iterator> iterator = running.entrySet().iterator(); - //avoid calculating current time everytime in loop + // avoid calculating current time everytime in loop long currentTime = clock.getTime(); while (iterator.hasNext()) { Map.Entry entry = iterator.next(); + O key = entry.getKey(); + long expireInterval = getExpireInterval(key); if (currentTime > entry.getValue() + expireInterval) { iterator.remove(); - expire(entry.getKey()); - LOG.info("Expired:" + entry.getKey().toString() + - " Timed out after " + expireInterval/1000 + " secs"); + expire(key); + LOG.info("Expired:" + entry.getKey().toString() + + " Timed out after " + expireInterval / 1000 + " secs"); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index e956507..7e94d01 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3069,4 +3069,13 @@ yarn.resourcemanager.node-removal-untracked.timeout-ms 60000 + + + + The RMAppLifeTimeMonitorService uses this value as monitor interval. + + yarn.resourcemanager.application.lifetimeout-monitor.interval-ms + 60000 + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index caa0ff13..0bf5f21 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppLifetimeMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -105,6 +106,8 @@ private boolean isSchedulerReady = false; private PlacementManager queuePlacementManager = null; + private RMAppLifetimeMonitor rmAppLifetimeMonitor; + public RMActiveServiceContext() { queuePlacementManager = new PlacementManager(); } @@ -467,4 +470,17 @@ public PlacementManager getQueuePlacementManager() { public void setQueuePlacementManager(PlacementManager placementMgr) { this.queuePlacementManager = placementMgr; } + + @Private + @Unstable + public void setRMAppLifetimeMonitor( + RMAppLifetimeMonitor rmAppLifetimeMonitor) { + this.rmAppLifetimeMonitor = rmAppLifetimeMonitor; + } + + @Private + @Unstable + public RMAppLifetimeMonitor getRMAppLifetimeMonitor() { + return this.rmAppLifetimeMonitor; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 2ba445c..f313a52 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppLifetimeMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -149,4 +150,8 @@ void setRMDelegatedNodeLabelsUpdater( LeaderElectorService getLeaderElectorService(); QueueLimitCalculator getNodeManagerQueueLimitCalculator(); + + void setRMAppLifetimeMonitor(RMAppLifetimeMonitor rmAppLifetimeMonitor); + + RMAppLifetimeMonitor getRMAppLifetimeMonitor(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 1e702de..b6d4fb8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppLifetimeMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -499,4 +500,15 @@ public void setContainerQueueLimitCalculator( QueueLimitCalculator limitCalculator) { this.queueLimitCalculator = limitCalculator; } + + @Override + public void setRMAppLifetimeMonitor( + RMAppLifetimeMonitor rmAppLifeTimeMonitor) { + this.activeServiceContext.setRMAppLifetimeMonitor(rmAppLifeTimeMonitor); + } + + @Override + public RMAppLifetimeMonitor getRMAppLifetimeMonitor() { + return this.activeServiceContext.getRMAppLifetimeMonitor(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index bf72fc1..d1c3e7c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -92,6 +92,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppLifetimeMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; @@ -556,6 +557,10 @@ protected void serviceInit(Configuration configuration) throws Exception { addService(amFinishingMonitor); rmContext.setAMFinishingMonitor(amFinishingMonitor); + RMAppLifetimeMonitor rmAppLifetimeMonitor = createRMAppLifetimeMonitor(); + addService(rmAppLifetimeMonitor); + rmContext.setRMAppLifetimeMonitor(rmAppLifetimeMonitor); + RMNodeLabelsManager nlm = createNodeLabelManager(); nlm.setRMContext(rmContext); addService(nlm); @@ -1398,4 +1403,8 @@ private static void printUsage(PrintStream out) { out.println(" " + "[-remove-application-from-state-store ]" + "\n"); } + + protected RMAppLifetimeMonitor createRMAppLifetimeMonitor() { + return new RMAppLifetimeMonitor(this.rmContext); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index e5bde32..71ebda2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1106,6 +1106,15 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { } } + long applicationLifetime = app.submissionContext.getApplicationLifetime(); + if (applicationLifetime > 0) { + app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId, + app.submitTime, applicationLifetime * 1000); + LOG.info("Application " + app.applicationId + + " is registered with Application lifetime monitor after recovery. " + + "The lifetime configured is " + applicationLifetime + " seconds"); + } + // No existent attempts means the attempt associated with this app was not // started or started but not yet saved. if (app.attempts.isEmpty()) { @@ -1152,6 +1161,12 @@ public void transition(RMAppImpl app, RMAppEvent event) { @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { + + if (app.submissionContext.getApplicationLifetime() > 0) { + app.rmContext.getRMAppLifetimeMonitor() + .unregisterApp(app.applicationId); + } + if (app.transitionTodo instanceof SingleArcTransition) { ((SingleArcTransition) app.transitionTodo).transition(app, app.eventCausingFinalSaving); @@ -1209,6 +1224,15 @@ private String getAppAttemptFailedDiagnostics(RMAppEvent event) { @Override public void transition(RMAppImpl app, RMAppEvent event) { + long applicationLifetime = app.submissionContext.getApplicationLifetime(); + if (applicationLifetime > 0) { + app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId, + app.submitTime, applicationLifetime * 1000); + LOG.info("Application " + app.applicationId + + " is registered with Application lifetime monitor. " + + "The lifetime configured is " + applicationLifetime + " seconds"); + } + // If recovery is enabled then store the application information in a // non-blocking call so make sure that RM has stored the information // needed to restart the AM after RM restart without further client diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLifetimeMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLifetimeMonitor.java new file mode 100644 index 0000000..8781f9e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLifetimeMonitor.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import java.util.EnumSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor; +import org.apache.hadoop.yarn.util.SystemClock; + +/** + * This service will monitor the applications against the lifetime value given. + * The applications will be killed if it running beyond the given time. + */ +public class RMAppLifetimeMonitor + extends AbstractLivelinessMonitor { + + private static final Log LOG = LogFactory.getLog(RMAppLifetimeMonitor.class); + + private RMContext rmContext; + private ConcurrentMap monitoredApps = + new ConcurrentHashMap(); + + private static final EnumSet COMPLETED_APP_STATES = + EnumSet.of(RMAppState.FINISHED, RMAppState.FINISHING, RMAppState.FAILED, + RMAppState.KILLED, RMAppState.FINAL_SAVING, RMAppState.KILLING); + + public RMAppLifetimeMonitor(RMContext rmContext) { + super(RMAppLifetimeMonitor.class.getName(), SystemClock.getInstance()); + this.rmContext = rmContext; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + int monitorInterval = conf.getInt( + YarnConfiguration.RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS); + if (monitorInterval <= 0) { + monitorInterval = + YarnConfiguration.DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS; + } + setMonitorInterval(monitorInterval); + LOG.info("Application lifelime monitor interval set to " + monitorInterval + + " ms."); + super.serviceInit(conf); + } + + @SuppressWarnings("unchecked") + @Override + protected void expire(ApplicationId appId) { + monitoredApps.remove(appId); + RMApp app = rmContext.getRMApps().get(appId); + if (app == null) { + return; + } + // Don't trigger an KILL event if application is in completed states + if (!COMPLETED_APP_STATES.contains(app.getState())) { + String diagnosis = + "Application killed due to exceeding its lifetime period " + + app.getApplicationSubmissionContext().getApplicationLifetime() + + " seconds"; + rmContext.getDispatcher().getEventHandler() + .handle(new RMAppEvent(appId, RMAppEventType.KILL, diagnosis)); + } else { + LOG.info("Application " + appId + + " is about to complete. So not killing the application."); + } + } + + public void registerApp(ApplicationId appId, long submitTime, + long applicationLifetime) { + register(appId, submitTime); + monitoredApps.putIfAbsent(appId, applicationLifetime); + } + + @Override + protected long getExpireInterval(ApplicationId appId) { + return monitoredApps.get(appId); + } + + public void unregisterApp(ApplicationId appId) { + unregister(appId); + monitoredApps.remove(appId); + } + + public void updateApplicationLifetime(ApplicationId appId, + long newAppLifetime) { + // only update for the registered objects + monitoredApps.replace(appId, newAppLifetime); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index f843261..5b9b8f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -460,7 +460,7 @@ public RMApp submitApp(int masterMemory, String name, String user, return submitApp(resource, name, user, acls, false, queue, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, - false, null, 0, null, true, priority, amLabel); + false, null, 0, null, true, priority, amLabel, -1); } public RMApp submitApp(Resource resource, String name, String user, @@ -561,7 +561,7 @@ public RMApp submitApp(Resource capability, String name, String user, return submitApp(capability, name, user, acls, unmanaged, queue, maxAppAttempts, ts, appType, waitForAccepted, keepContainers, isAppIdProvided, applicationId, attemptFailuresValidityInterval, - logAggregationContext, cancelTokensWhenComplete, priority, ""); + logAggregationContext, cancelTokensWhenComplete, priority, "", -1); } public RMApp submitApp(Resource capability, String name, String user, @@ -570,7 +570,8 @@ public RMApp submitApp(Resource capability, String name, String user, boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, ApplicationId applicationId, long attemptFailuresValidityInterval, LogAggregationContext logAggregationContext, - boolean cancelTokensWhenComplete, Priority priority, String amLabel) + boolean cancelTokensWhenComplete, Priority priority, String amLabel, + long applicationLifetime) throws Exception { ApplicationId appId = isAppIdProvided ? applicationId : null; ApplicationClientProtocol client = getClientRMService(); @@ -587,6 +588,9 @@ public RMApp submitApp(Resource capability, String name, String user, sub.setApplicationId(appId); sub.setApplicationName(name); sub.setMaxAppAttempts(maxAppAttempts); + if (-1 != applicationLifetime) { + sub.setApplicationLifetime(applicationLifetime); + } if (unmanaged) { sub.setUnmanagedAM(true); } @@ -1073,4 +1077,14 @@ public void waitForAppRemovedFromScheduler(ApplicationId appId, !apps.containsKey(appId)); LOG.info("app is removed from scheduler, " + appId); } + public RMApp submitApp(int masterMemory, Priority priority, + long applicationLifeTime) throws Exception { + Resource resource = Resource.newInstance(masterMemory, 0); + return submitApp( + resource,"",UserGroupInformation.getCurrentUser().getShortUserName(), + null,false,null, + super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, + false, false, null, 0, null, true, priority, null, applicationLifeTime); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java new file mode 100644 index 0000000..129a856 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Set; + +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart; +import org.apache.hadoop.yarn.server.resourcemanager.TestWorkPreservingRMRestart; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Test class for application life time monitor feature test. + */ +public class TestApplicationLifetimeMonitor { + private final int GB = 1024; + + private YarnConfiguration conf; + + @Before + public void setup() throws IOException { + conf = new YarnConfiguration(); + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + UserGroupInformation.setConfiguration(conf); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, + true); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setLong(YarnConfiguration.RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS, + 3000L); + } + + @Test(timeout = 90000) + public void testApplicationLifeTimeMonitor() throws Exception { + MockRM rm = null; + try { + rm = new MockRM(conf); + rm.start(); + Priority appPriority = Priority.newInstance(0); + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * GB); + RMApp app1 = rm.submitApp(1 * GB, appPriority, 10); + nm1.nodeHeartbeat(true); + // Send launch Event + MockAM am1 = + rm.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId()); + am1.registerAppAttempt(); + rm.waitForState(app1.getApplicationId(), RMAppState.KILLED); + Assert.assertTrue("Applicaiton killed before life timeout value", + (System.currentTimeMillis() - app1.getSubmitTime()) > 10000); + } finally { + stopRM(rm); + } + } + + @SuppressWarnings("rawtypes") + @Test(timeout = 180000) + public void testApplicationLifeTimeOnRMRestart() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + nm1.nodeHeartbeat(true); + + long appLifetime = 60l; + RMApp app1 = rm1.submitApp(200, Priority.newInstance(0), appLifetime); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // Re-start RM + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + + // recover app + RMApp recoveredApp1 = + rm2.getRMContext().getRMApps().get(app1.getApplicationId()); + + NMContainerStatus amContainer = TestRMRestart.createNMContainerStatus( + am1.getApplicationAttemptId(), 1, ContainerState.RUNNING); + NMContainerStatus runningContainer = TestRMRestart.createNMContainerStatus( + am1.getApplicationAttemptId(), 2, ContainerState.RUNNING); + + nm1.registerNode(Arrays.asList(amContainer, runningContainer), null); + + // Wait for RM to settle down on recovering containers; + TestWorkPreservingRMRestart.waitForNumContainersToRecover(2, rm2, + am1.getApplicationAttemptId()); + Set launchedContainers = + ((RMNodeImpl) rm2.getRMContext().getRMNodes().get(nm1.getNodeId())) + .getLaunchedContainers(); + assertTrue(launchedContainers.contains(amContainer.getContainerId())); + assertTrue(launchedContainers.contains(runningContainer.getContainerId())); + + // check RMContainers are re-recreated and the container state is correct. + rm2.waitForState(nm1, amContainer.getContainerId(), + RMContainerState.RUNNING); + rm2.waitForState(nm1, runningContainer.getContainerId(), + RMContainerState.RUNNING); + + // re register attempt to rm2 + rm2.waitForState(recoveredApp1.getApplicationId(), RMAppState.ACCEPTED); + am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); + am1.registerAppAttempt(); + rm2.waitForState(recoveredApp1.getApplicationId(), RMAppState.RUNNING); + + // wait for app life time and application to be in killed state. + rm2.waitForState(recoveredApp1.getApplicationId(), RMAppState.KILLED); + Assert.assertTrue("Applicaiton killed before life timeout value", + (System.currentTimeMillis() + - recoveredApp1.getSubmitTime()) > appLifetime); + } + + private void stopRM(MockRM rm) { + if (rm != null) { + rm.stop(); + } + } +} -- 2.7.4 (Apple Git-66)