--- .../api/records/ApplicationSubmissionContext.java | 21 +++ .../apache/hadoop/yarn/conf/YarnConfiguration.java | 6 + .../src/main/proto/yarn_protos.proto | 1 + .../pb/ApplicationSubmissionContextPBImpl.java | 17 +++ .../yarn/server/resourcemanager/RMContext.java | 6 + .../yarn/server/resourcemanager/RMContextImpl.java | 13 ++ .../server/resourcemanager/ResourceManager.java | 12 +- .../server/resourcemanager/rmapp/RMAppImpl.java | 22 +++ .../rmapp/attempt/RMAppTimeOutMonitor.java | 158 +++++++++++++++++++++ .../hadoop/yarn/server/resourcemanager/MockRM.java | 99 ++++++++----- .../rmapp/TestApplicationTimeOut.java | 63 ++++++++ 11 files changed, 384 insertions(+), 34 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppTimeOutMonitor.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationTimeOut.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index 21cd1bb..112ec7b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -535,4 +535,25 @@ public abstract void setLogAggregationContext( @Public @Unstable public abstract void setReservationID(ReservationId reservationID); + + /** + * Get the application timeout value. The application will be killed if is not + * completed in the given time + * + * @return application timeout value. + */ + @Public + @Unstable + public abstract long getApplicationTimeOut(); + + /** + * Set the application timeout value.The application will be killed if is not + * completed in the given time + * + * @param applicationTimeout application timeout value + */ + @Public + @Unstable + public abstract void setApplicationTimeOut(long applicationTimeout); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index a18ef7c..15eebe6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2003,6 +2003,12 @@ public static boolean isDistributedNodeLabelConfiguration(Configuration conf) { public static final String NM_PROVIDER_CONFIGURED_NODE_LABELS = NM_NODE_LABELS_PROVIDER_PREFIX + "configured-node-labels"; + // Configurations for applicaiton timeout feature + public static final String RM_APP_MONITOR_INTERVAL_SEC = YARN_PREFIX + + "app.timeout.monitor.interval"; + + public static final long DEFAULT_RM_APP_MONITOR_INTERVAL_SEC = 60L; + public YarnConfiguration() { super(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 1bd3dda..61bff55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -336,6 +336,7 @@ message ApplicationSubmissionContextProto { optional ReservationIdProto reservation_id = 15; optional string node_label_expression = 16; optional ResourceRequestProto am_container_resource_request = 17; + optional int64 app_timeout_value = 18 [default = -1]; } message LogAggregationContextProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java index 67e3a84..9de4ab9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java @@ -548,4 +548,21 @@ private ReservationIdPBImpl convertFromProtoFormat(ReservationIdProto p) { private ReservationIdProto convertToProtoFormat(ReservationId t) { return ((ReservationIdPBImpl) t).getProto(); } + + @Override + public long getApplicationTimeOut() { + + ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasAppTimeoutValue()) { + return -1; + } + return (p.getAppTimeoutValue()); + } + + @Override + public void setApplicationTimeOut(long appTimeoutValue) { + maybeInitBuilder(); + builder.setAppTimeoutValue(appTimeoutValue); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index bc50268..1730d60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppTimeOutMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -124,4 +125,9 @@ void setRMApplicationHistoryWriter( boolean isSchedulerReadyForAllocatingContainers(); Configuration getYarnConfiguration(); + + void setRMAppTimeOutMonitor(RMAppTimeOutMonitor rmAppTimeOutMonitor); + + RMAppTimeOutMonitor getRMAppTimeOutMonitor(); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index d6d573d..53297bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppTimeOutMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -68,6 +69,8 @@ private Configuration yarnConfiguration; + private RMAppTimeOutMonitor rmAppTimeOutMonitor; + private RMApplicationHistoryWriter rmApplicationHistoryWriter; private SystemMetricsPublisher systemMetricsPublisher; @@ -438,4 +441,14 @@ public Configuration getYarnConfiguration() { public void setYarnConfiguration(Configuration yarnConfiguration) { this.yarnConfiguration=yarnConfiguration; } + + @Override + public void setRMAppTimeOutMonitor(RMAppTimeOutMonitor rmAppTimeOutMonitor) { + this.rmAppTimeOutMonitor = rmAppTimeOutMonitor; + } + + @Override + public RMAppTimeOutMonitor getRMAppTimeOutMonitor() { + return this.rmAppTimeOutMonitor; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index d6d9629..3e48e9b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppTimeOutMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; @@ -169,6 +170,8 @@ private UserGroupInformation rmLoginUGI; + protected RMAppTimeOutMonitor rmAppTimeOutMonitor; + public ResourceManager() { super("ResourceManager"); } @@ -556,7 +559,9 @@ protected void serviceInit(Configuration configuration) throws Exception { addService(delegationTokenRenewer); delegationTokenRenewer.setRMContext(rmContext); } - + rmAppTimeOutMonitor = createRMAppTimeOutMonitor(); + addService(rmAppTimeOutMonitor); + rmContext.setRMAppTimeOutMonitor(rmAppTimeOutMonitor); new RMNMInfo(rmContext, scheduler); super.serviceInit(conf); @@ -1282,4 +1287,9 @@ private static void printUsage(PrintStream out) { out.println(" " + "[-remove-application-from-state-store ]" + "\n"); } + + protected RMAppTimeOutMonitor createRMAppTimeOutMonitor() { + return new RMAppTimeOutMonitor(this.rmContext); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 2eb74f7..ea3f92e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -921,6 +921,17 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { app.diagnostics.append(msg); LOG.error(msg, e); } + + // Handle the timeout during recovery + long applicationTimeout = app.submissionContext.getApplicationTimeOut(); + if (applicationTimeout > 0) { + app.rmContext.getRMAppTimeOutMonitor().register(app, + app.submissionContext.getApplicationTimeOut() * 1000); + LOG.debug("Application " + + app.getApplicationId() + + " is registered with Application Timeout monitor. The timeout configured is " + + app.submissionContext.getApplicationTimeOut()); + } } // No existent attempts means the attempt associated with this app was not @@ -962,6 +973,16 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) { app.handler.handle(new AppAddedSchedulerEvent(app.user, app.submissionContext, false)); + + long applicationTimeout = app.submissionContext.getApplicationTimeOut(); + if (applicationTimeout > 0) { + app.rmContext.getRMAppTimeOutMonitor() + .register(app, applicationTimeout * 1000); + LOG.debug("Application " + + app.getApplicationId() + + " is registered with Application Timeout monitor. The timeout configured is " + + app.submissionContext.getApplicationTimeOut()); + } } } @@ -1112,6 +1133,7 @@ public FinalSavingTransition(Object transitionToDo, public void transition(RMAppImpl app, RMAppEvent event) { app.rememberTargetTransitionsAndStoreState(event, transitionToDo, targetedFinalState, stateToBeStored); + app.rmContext.getRMAppTimeOutMonitor().unregister(app); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppTimeOutMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppTimeOutMonitor.java new file mode 100644 index 0000000..758f199 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppTimeOutMonitor.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; + +public class RMAppTimeOutMonitor extends AbstractService { + + private static final Log LOG = LogFactory.getLog(RMAppTimeOutMonitor.class); + + private RMContext rmContext; + + private Clock clock; + private Thread rmAppTimeOutThread = null; + private volatile boolean stopped = false; + + private long threadSleepTime = 0; + + private ConcurrentMap rmApps = + new ConcurrentHashMap(); + + public RMAppTimeOutMonitor(RMContext rmContext) { + super(RMAppTimeOutMonitor.class.getName()); + this.rmContext = rmContext; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + clock = new SystemClock(); + long applicationTimeOurMonitorInterval = + conf.getLong(YarnConfiguration.RM_APP_MONITOR_INTERVAL_SEC, + YarnConfiguration.DEFAULT_RM_APP_MONITOR_INTERVAL_SEC); + this.threadSleepTime = + (applicationTimeOurMonitorInterval > 0) ? (applicationTimeOurMonitorInterval * 1000) + : (YarnConfiguration.DEFAULT_RM_APP_MONITOR_INTERVAL_SEC * 1000); + LOG.info("RMAppTimeOutThread interval configured as " + + this.threadSleepTime + " sec"); + } + + @Override + protected void serviceStart() throws Exception { + rmAppTimeOutThread = new RMAppTimeOutMonitorThread(); + rmAppTimeOutThread.setName("RMAppTimeOutMonitorThread"); + rmAppTimeOutThread.setDaemon(true); + rmAppTimeOutThread.start(); + LOG.info("Successfully started RMAppTimeOutMonitorThread"); + } + + private class RMAppTimeOutMonitorThread extends Thread { + + @SuppressWarnings("unchecked") + @Override + public void run() { + LOG.debug("Starting RMAppTimeOutMonitorThread."); + long timeOut; + RMApp application = null; + while (!stopped && !Thread.currentThread().isInterrupted()) { + try { + LOG.debug("Inside RMAppTimeOutMonitorThread.run"); + for (Map.Entry e : rmApps.entrySet()) { + timeOut = e.getValue(); + application = e.getKey(); + if (!application.isAppFinalStateStored()) { + long currentTime = clock.getTime(); + LOG.debug("Timeout : " + timeOut + " currentTime : " + + currentTime + " SubmitTime : " + + application.getSubmitTime()); + if ((currentTime - application.getSubmitTime()) > timeOut) { + rmContext + .getDispatcher() + .getEventHandler() + .handle( + new RMAppEvent(application.getApplicationId(), + RMAppEventType.KILL)); + LOG.warn("Application timed out. Issuing kill command on the application- Application id: " + + application.getApplicationId().toString()); + } + } else { + rmApps.remove(application); + LOG.debug("Application " + + application.getApplicationId() + + " is removed from the timeout monitor since it is completed"); + } + } + } catch (YarnRuntimeException runexe) { + LOG.error("YarnRuntimeException exception occured while killing application based on application timeout" + + runexe); + } catch (Exception exe) { + LOG.warn("Exception occurred while killing applicaiton " + + application.getApplicationId()); + } + try { + Thread.sleep(threadSleepTime); + } catch (InterruptedException e1) { + LOG.debug("RMAppTimeOut sleep is over. Going for next iteration."); + } + } + } + } + + @Override + protected void serviceStop() throws Exception { + stopped = true; + if (rmAppTimeOutThread != null) { + rmAppTimeOutThread.interrupt(); + try { + rmAppTimeOutThread.join(); + } catch (InterruptedException ie) { + LOG.warn("Interrupted Exception while stopping"); + } + } + super.serviceStop(); + } + + public synchronized void register(RMApp ob, long timeOut) { + if (!rmApps.containsKey(ob)) { + rmApps.put(ob, timeOut); + } + } + + public synchronized void unregister(RMApp ob) { + if (rmApps.containsKey(ob)) { + rmApps.remove(ob); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 5080355..8c11f0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -290,6 +290,7 @@ public RMApp submitApp(int masterMemory) throws Exception { return submitApp(masterMemory, false); } + public RMApp submitApp(int masterMemory, Priority priority) throws Exception { Resource resource = Resource.newInstance(masterMemory, 0); return submitApp(resource, "", UserGroupInformation.getCurrentUser() @@ -298,6 +299,16 @@ public RMApp submitApp(int masterMemory, Priority priority) throws Exception { YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, false, null, 0, null, true, priority); } + + public RMApp submitApp(int masterMemory, Priority priority, + long applicationTimeOut) throws Exception { + Resource resource = Resource.newInstance(masterMemory, 0); + return submitApp(resource, "", UserGroupInformation.getCurrentUser() + .getShortUserName(), null, false, null, + super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, + false, false, null, 0, null, true, priority, applicationTimeOut); + } public RMApp submitApp(int masterMemory, boolean unmanaged) throws Exception { @@ -426,23 +437,41 @@ public RMApp submitApp(Resource capability, String name, String user, boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, ApplicationId applicationId, long attemptFailuresValidityInterval, LogAggregationContext logAggregationContext, - boolean cancelTokensWhenComplete, Priority priority) + boolean cancelTokensWhenComplete, Priority priority) throws Exception { + return submitApp(capability, name, user, acls, unmanaged, queue, + maxAppAttempts, ts, appType, waitForAccepted, keepContainers, + isAppIdProvided, applicationId, attemptFailuresValidityInterval, + logAggregationContext, cancelTokensWhenComplete, priority, -1); + + } + + public RMApp submitApp(Resource capability, String name, String user, + Map acls, boolean unmanaged, String queue, + int maxAppAttempts, Credentials ts, String appType, + boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, + ApplicationId applicationId, long attemptFailuresValidityInterval, + LogAggregationContext logAggregationContext, + boolean cancelTokensWhenComplete, Priority priority, + long applicationTimeOut) throws Exception { ApplicationId appId = isAppIdProvided ? applicationId : null; ApplicationClientProtocol client = getClientRMService(); - if (! isAppIdProvided) { - GetNewApplicationResponse resp = client.getNewApplication(Records - .newRecord(GetNewApplicationRequest.class)); + if (!isAppIdProvided) { + GetNewApplicationResponse resp = + client.getNewApplication(Records + .newRecord(GetNewApplicationRequest.class)); appId = resp.getApplicationId(); } - SubmitApplicationRequest req = Records - .newRecord(SubmitApplicationRequest.class); - ApplicationSubmissionContext sub = Records - .newRecord(ApplicationSubmissionContext.class); + SubmitApplicationRequest req = + Records.newRecord(SubmitApplicationRequest.class); + ApplicationSubmissionContext sub = + Records.newRecord(ApplicationSubmissionContext.class); sub.setKeepContainersAcrossApplicationAttempts(keepContainers); sub.setApplicationId(appId); sub.setApplicationName(name); sub.setMaxAppAttempts(maxAppAttempts); + if (-1 != applicationTimeOut) + sub.setApplicationTimeOut(applicationTimeOut); if (unmanaged) { sub.setUnmanagedAM(true); } @@ -453,14 +482,15 @@ public RMApp submitApp(Resource capability, String name, String user, sub.setPriority(priority); } sub.setApplicationType(appType); - ContainerLaunchContext clc = Records - .newRecord(ContainerLaunchContext.class); + ContainerLaunchContext clc = + Records.newRecord(ContainerLaunchContext.class); sub.setResource(capability); clc.setApplicationACLs(acls); if (ts != null && UserGroupInformation.isSecurityEnabled()) { DataOutputBuffer dob = new DataOutputBuffer(); ts.writeTokenStorageToStream(dob); - ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + ByteBuffer securityTokens = + ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); clc.setTokens(securityTokens); } sub.setAMContainerSpec(clc); @@ -471,29 +501,32 @@ public RMApp submitApp(Resource capability, String name, String user, sub.setCancelTokensWhenComplete(cancelTokensWhenComplete); req.setApplicationSubmissionContext(sub); UserGroupInformation fakeUser = - UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"}); + UserGroupInformation.createUserForTesting(user, + new String[] { "someGroup" }); PrivilegedAction action = - new PrivilegedAction() { - ApplicationClientProtocol client; - SubmitApplicationRequest req; - @Override - public SubmitApplicationResponse run() { - try { - return client.submitApplication(req); - } catch (YarnException e) { - e.printStackTrace(); - } catch (IOException e) { - e.printStackTrace(); - } - return null; - } - PrivilegedAction setClientReq( - ApplicationClientProtocol client, SubmitApplicationRequest req) { - this.client = client; - this.req = req; - return this; - } - }.setClientReq(client, req); + new PrivilegedAction() { + ApplicationClientProtocol client; + SubmitApplicationRequest req; + + @Override + public SubmitApplicationResponse run() { + try { + return client.submitApplication(req); + } catch (YarnException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + PrivilegedAction setClientReq( + ApplicationClientProtocol client, SubmitApplicationRequest req) { + this.client = client; + this.req = req; + return this; + } + }.setClientReq(client, req); fakeUser.doAs(action); // make sure app is immediately available after submit if (waitForAccepted) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationTimeOut.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationTimeOut.java new file mode 100644 index 0000000..072b4ad --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationTimeOut.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.junit.Assert; +import org.junit.Test; + +public class TestApplicationTimeOut { + private final int GB = 1024; + + @Test + public void testApplicationTimeOut() throws Exception { + + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.set(YarnConfiguration.RM_APP_MONITOR_INTERVAL_SEC, "1"); + // Set Max Application Priority as 10 + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); + MockRM rm = new MockRM(conf); + rm.start(); + + Priority appPriority = Priority.newInstance(0); + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * GB); + long submitTime=System.currentTimeMillis(); + RMApp app1 = rm.submitApp(1 * GB, appPriority, 5); + + // kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1 + MockAM am1 = MockRM.launchAM(app1, rm, nm1); + am1.registerAppAttempt(); + + rm.waitForState(app1.getApplicationId(), RMAppState.KILLED); + long endTime = System.currentTimeMillis(); + Assert.assertTrue("Applicaiton killed before timeout value", + (endTime - submitTime) > Long.valueOf("5")); + rm.stop(); + } + +} -- 1.9.2.msysgit.0