diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java index 394454f..6d39366 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java @@ -578,7 +578,7 @@ SignalContainerResponse signalToContainer( * Note: If application timeout value is less than or equal to current * time then update application throws YarnException. * @param request to set ApplicationTimeouts of an application - * @return an empty response that the update has completed successfully. + * @return a response with updated timeouts. * @throws YarnException if update request has empty values or application is * in completing states. * @throws IOException on IO failures diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsResponse.java index bd02bb8..3770eb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsResponse.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.util.Records; /** @@ -43,4 +44,22 @@ public static UpdateApplicationTimeoutsResponse newInstance() { Records.newRecord(UpdateApplicationTimeoutsResponse.class); return response; } + + /** + * Get ApplicationTimeouts of the application. Timeout value is + * in ISO8601 standard with format yyyy-MM-dd'T'HH:mm:ss.SSSZ. + * @return all ApplicationTimeouts of the application. + */ + public abstract Map getApplicationTimeouts(); + + /** + * Set the ApplicationTimeouts for the application. Timeout value + * is absolute. Timeout value should meet ISO8601 format. Support ISO8601 + * format is yyyy-MM-dd'T'HH:mm:ss.SSSZ. All pre-existing Map entries + * are cleared before adding the new Map. + * @param applicationTimeouts ApplicationTimeoutss for the + * application + */ + public abstract void setApplicationTimeouts( + Map applicationTimeouts); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java index 893348a..5f6b300 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java @@ -41,6 +41,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -359,10 +360,21 @@ private void updateApplicationTimeout(String applicationId, + timeoutType.toString() + " of an application " + applicationId); UpdateApplicationTimeoutsRequest request = UpdateApplicationTimeoutsRequest .newInstance(appId, Collections.singletonMap(timeoutType, newTimeout)); - client.updateApplicationTimeouts(request); + UpdateApplicationTimeoutsResponse updateApplicationTimeouts = + client.updateApplicationTimeouts(request); + String updatedTimeout = + updateApplicationTimeouts.getApplicationTimeouts().get(timeoutType); + + if (timeoutType.equals(ApplicationTimeoutType.LIFETIME) + && !newTimeout.equals(updatedTimeout)) { + sysout.println("Updated lifetime of an application " + applicationId + + " to queue max/default lifetime." + " New expiry time is " + + updatedTimeout); + return; + } sysout.println( "Successfully updated " + timeoutType.toString() + " of an application " - + applicationId + ". New expiry time is " + newTimeout); + + applicationId + ". New expiry time is " + updatedTimeout); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java index 3c35b9c..13730f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java @@ -48,6 +48,7 @@ import org.apache.commons.lang.time.DateFormatUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -2148,17 +2149,16 @@ public void testUpdateApplicationTimeout() throws Exception { ApplicationCLI cli = createAndGetAppCLI(); ApplicationId applicationId = ApplicationId.newInstance(1234, 6); - ApplicationReport appReport = ApplicationReport.newInstance(applicationId, - ApplicationAttemptId.newInstance(applicationId, 1), "user", "queue", - "appname", "host", 124, null, YarnApplicationState.RUNNING, - "diagnostics", "url", 0, 0, FinalApplicationStatus.UNDEFINED, null, - "N/A", 0.53789f, "YARN", null); - ApplicationTimeout timeout = ApplicationTimeout - .newInstance(ApplicationTimeoutType.LIFETIME, "N/A", -1); - appReport.setApplicationTimeouts( - Collections.singletonMap(timeout.getTimeoutType(), timeout)); - when(client.getApplicationReport(any(ApplicationId.class))) - .thenReturn(appReport); + UpdateApplicationTimeoutsResponse response = + mock(UpdateApplicationTimeoutsResponse.class); + String formatISO8601 = + Times.formatISO8601(System.currentTimeMillis() + 5 * 1000); + when(response.getApplicationTimeouts()).thenReturn(Collections + .singletonMap(ApplicationTimeoutType.LIFETIME, formatISO8601)); + + when(client + .updateApplicationTimeouts(any(UpdateApplicationTimeoutsRequest.class))) + .thenReturn(response); int result = cli.run(new String[] { "application", "-appId", applicationId.toString(), "-updateLifetime", "10" }); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsResponsePBImpl.java index 74f1715..52b4652 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsResponsePBImpl.java @@ -18,10 +18,21 @@ package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationUpdateTimeoutMapProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsRequestProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsResponseProtoOrBuilder; import com.google.protobuf.TextFormat; @@ -33,6 +44,7 @@ UpdateApplicationTimeoutsResponseProto.getDefaultInstance(); UpdateApplicationTimeoutsResponseProto.Builder builder = null; boolean viaProto = false; + private Map applicationTimeouts = null; public UpdateApplicationTimeoutsResponsePBImpl() { builder = UpdateApplicationTimeoutsResponseProto.newBuilder(); @@ -45,11 +57,33 @@ public UpdateApplicationTimeoutsResponsePBImpl( } public UpdateApplicationTimeoutsResponseProto getProto() { + mergeLocalToProto(); proto = viaProto ? proto : builder.build(); viaProto = true; return proto; } + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = UpdateApplicationTimeoutsResponseProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + if (this.applicationTimeouts != null) { + addApplicationTimeouts(); + } + } + @Override public int hashCode() { return getProto().hashCode(); @@ -70,4 +104,79 @@ public boolean equals(Object other) { public String toString() { return TextFormat.shortDebugString(getProto()); } + + @Override + public Map getApplicationTimeouts() { + initApplicationTimeout(); + return this.applicationTimeouts; + } + + private void initApplicationTimeout() { + if (this.applicationTimeouts != null) { + return; + } + UpdateApplicationTimeoutsResponseProtoOrBuilder p = + viaProto ? proto : builder; + List lists = + p.getApplicationTimeoutsList(); + this.applicationTimeouts = + new HashMap(lists.size()); + for (ApplicationUpdateTimeoutMapProto timeoutProto : lists) { + this.applicationTimeouts.put( + ProtoUtils + .convertFromProtoFormat(timeoutProto.getApplicationTimeoutType()), + timeoutProto.getExpireTime()); + } + } + + @Override + public void setApplicationTimeouts( + Map appTimeouts) { + if (appTimeouts == null) { + return; + } + initApplicationTimeout(); + this.applicationTimeouts.clear(); + this.applicationTimeouts.putAll(appTimeouts); + } + + private void addApplicationTimeouts() { + maybeInitBuilder(); + builder.clearApplicationTimeouts(); + if (applicationTimeouts == null) { + return; + } + Iterable values = + new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + private Iterator iterator = + applicationTimeouts.keySet().iterator(); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public ApplicationUpdateTimeoutMapProto next() { + ApplicationTimeoutType key = iterator.next(); + return ApplicationUpdateTimeoutMapProto.newBuilder() + .setExpireTime(applicationTimeouts.get(key)) + .setApplicationTimeoutType( + ProtoUtils.convertToProtoFormat(key)) + .build(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + this.builder.addAllApplicationTimeouts(values); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml index 785ed04..2d40e58 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml @@ -107,6 +107,34 @@ + yarn.scheduler.capacity.root.default.maximum-application-lifetime + -1 + + Maximum lifetime of an applications submitted to a queue + in seconds. Any value less than or equal to zero is considered as disabled. + It is hard limit for any applications submitting in this + queue. If positive value is configured then any application submitted + to this queue will be killed after exceeds configured lifetime. + User can also specify lifetime per application basis in + application submission context. But user lifetime will be + overridden if it exceeds queue maximum lifetime. It is point-in-time configuration. + Note : Configuring too low value will result it killing application sooner. + + + + + yarn.scheduler.capacity.root.default.default-application-lifetime + -1 + + Default lifetime of an applications submitted to a queue + in seconds. Any value less than or equal to zero is considered as disabled. + If the user is not submitted with lifetime value then this + value will be taken. It is point-in-time configuration. + Note : Default lifetime cann't exceeds maximum lifetime! + + + + yarn.scheduler.capacity.node-locality-delay 40 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index e6c25ad..df38893 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -1691,6 +1691,7 @@ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( RMAuditLogger.logSuccess(callerUGI.getShortUserName(), AuditConstants.UPDATE_APP_TIMEOUTS, "ClientRMService", applicationId); + response.setApplicationTimeouts(applicationTimeouts); return response; } String msg = @@ -1702,7 +1703,8 @@ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( } try { - rmAppManager.updateApplicationTimeout(application, applicationTimeouts); + applicationTimeouts = rmAppManager.updateApplicationTimeout(application, + applicationTimeouts); } catch (YarnException ex) { RMAuditLogger.logFailure(callerUGI.getShortUserName(), AuditConstants.UPDATE_APP_TIMEOUTS, "UNKNOWN", "ClientRMService", @@ -1712,6 +1714,7 @@ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( RMAuditLogger.logSuccess(callerUGI.getShortUserName(), AuditConstants.UPDATE_APP_TIMEOUTS, "ClientRMService", applicationId); + response.setApplicationTimeouts(applicationTimeouts); return response; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index bcd1a9c..56ac82a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.Times; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.SettableFuture; @@ -571,18 +572,42 @@ public void handle(RMAppManagerEvent event) { } // transaction method. - public void updateApplicationTimeout(RMApp app, + public Map updateApplicationTimeout(RMApp app, Map newTimeoutInISO8601Format) throws YarnException { ApplicationId applicationId = app.getApplicationId(); synchronized (applicationId) { if (app.isAppInCompletedStates()) { - return; + return newTimeoutInISO8601Format; } Map newExpireTime = RMServerUtils .validateISO8601AndConvertToLocalTimeEpoch(newTimeoutInISO8601Format); + // validation is only for lifetime + Long updatedlifetimeInMills = + newExpireTime.get(ApplicationTimeoutType.LIFETIME); + if (updatedlifetimeInMills != null) { + // always send Long#MAX_VALUE to get queue maximum lifetime value! + long queueMaxLifetimeInSec = + scheduler.checkAndGetApplicationLifetime(app.getQueue(), Long.MAX_VALUE); + + if (queueMaxLifetimeInSec > 0) { + if (updatedlifetimeInMills > (app.getSubmitTime() + + queueMaxLifetimeInSec * 1000)) { + updatedlifetimeInMills = + app.getSubmitTime() + queueMaxLifetimeInSec * 1000; + // cut off to maximum queue lifetime if update lifetime is exceeding + // queue lifetime. + newExpireTime.put(ApplicationTimeoutType.LIFETIME, + updatedlifetimeInMills); + + newTimeoutInISO8601Format.put(ApplicationTimeoutType.LIFETIME, + Times.formatISO8601(updatedlifetimeInMills.longValue())); + } + } + } + SettableFuture future = SettableFuture.create(); Map currentExpireTimeouts = @@ -605,6 +630,8 @@ public void updateApplicationTimeout(RMApp app, // update in-memory ((RMAppImpl) app).updateApplicationTimeout(newExpireTime); + + return newTimeoutInISO8601Format; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 13b0792..665f458 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1225,6 +1225,8 @@ public void transition(RMAppImpl app, RMAppEvent event) { long applicationLifetime = app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME); + applicationLifetime = app.scheduler + .checkAndGetApplicationLifetime(app.queue, applicationLifetime); if (applicationLifetime > 0) { // calculate next timeout value Long newTimeout = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 2c27017..43a10e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -1283,4 +1283,10 @@ public void asyncContainerRelease(RMContainer container) { this.rmContext.getDispatcher().getEventHandler() .handle(new ReleaseContainerEvent(container)); } + + @Override + public long checkAndGetApplicationLifetime(String queueName, long lifetime) { + // -1 indicates, lifetime is not configured. + return -1; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index 08e0603..0f88d19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -385,4 +385,15 @@ public Priority updateApplicationPriority(Priority newPriority, * @return the normalized resource */ Resource getNormalizedResource(Resource requestedResource); + + /** + * Verify whether a submitted application lifetime is valid as per configured + * Queue lifetime + * @param queueName Name of the Queue + * @param lifetime requestedByApp + * @return valid lifetime as per queue + */ + @Public + @Unstable + long checkAndGetApplicationLifetime(String queueName, long lifetime); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index fde84c4..84472df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2564,4 +2564,36 @@ public boolean moveReservedContainer(RMContainer toBeMovedContainer, writeLock.unlock(); } } + + @Override + public long checkAndGetApplicationLifetime(String queueName, + long lifetimeRequestedByApp) { + try { + readLock.lock(); + CSQueue queue = getQueue(queueName); + if (queue == null || !(queue instanceof LeafQueue)) { + return lifetimeRequestedByApp; + } + + long defaultApplicationLifetime = + ((LeafQueue) queue).getDefaultApplicationLifetime(); + long maximumApplicationLifetime = + ((LeafQueue) queue).getMaximumApplicationLifetime(); + + // check only for maximum, that's enough because default cann't + // exceed maximum + if (maximumApplicationLifetime <= 0) { + return lifetimeRequestedByApp; + } + + if (lifetimeRequestedByApp <= 0) { + return defaultApplicationLifetime; + } else if (lifetimeRequestedByApp > maximumApplicationLifetime) { + return ((LeafQueue) queue).getMaximumApplicationLifetime(); + } + return lifetimeRequestedByApp; + } finally { + readLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 13b9ff6..57daeb5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1496,4 +1496,28 @@ public boolean getAssignMultipleEnabled() { public int getMaxAssignPerHeartbeat() { return getInt(MAX_ASSIGN_PER_HEARTBEAT, DEFAULT_MAX_ASSIGN_PER_HEARTBEAT); } + + public static final String MAXIMUM_LIFETIME_SUFFIX = "maximum-application-lifetime"; + + public static final String DEFAULT_LIFETIME_SUFFIX = "default-application-lifetime"; + + public long getMaximumLifetimePerQueue(String queue) { + long maximumLifetimePerQueue = getLong( + getQueuePrefix(queue) + MAXIMUM_LIFETIME_SUFFIX, (long) UNDEFINED); + return maximumLifetimePerQueue; + } + + public void setMaximumLifetimePerQueue(String queue, long maximumLifetime) { + setLong(getQueuePrefix(queue) + MAXIMUM_LIFETIME_SUFFIX, maximumLifetime); + } + + public long getDefaultLifetimePerQueue(String queue) { + long maximumLifetimePerQueue = getLong( + getQueuePrefix(queue) + DEFAULT_LIFETIME_SUFFIX, (long) UNDEFINED); + return maximumLifetimePerQueue; + } + + public void setDefaultLifetimePerQueue(String queue, long defaultLifetime) { + setLong(getQueuePrefix(queue) + DEFAULT_LIFETIME_SUFFIX, defaultLifetime); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index d15431e..22ee01b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; @@ -130,6 +131,10 @@ List priorityAcls = new ArrayList(); + // -1 indicates lifetime is disabled + private volatile long maxLifetime = -1; + private volatile long defaultLifetime = -1; + @SuppressWarnings({ "unchecked", "rawtypes" }) public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { @@ -237,6 +242,13 @@ protected void setupQueueConfigs(Resource clusterResource) defaultAppPriorityPerQueue = Priority.newInstance( conf.getDefaultApplicationPriorityConfPerQueue(getQueuePath())); + + maxLifetime = conf.getMaximumLifetimePerQueue((getQueuePath())); + defaultLifetime = conf.getDefaultLifetimePerQueue((getQueuePath())); + if (defaultLifetime > maxLifetime) { + throw new YarnRuntimeException("Default lifetime" + defaultLifetime + + " cann't exceed maximum lifetime " + maxLifetime); + } // Validate leaf queue's user's weights. int queueUL = Math.min(100, conf.getUserLimit(getQueuePath())); @@ -293,7 +305,9 @@ protected void setupQueueConfigs(Resource clusterResource) + "reservationsContinueLooking = " + reservationsContinueLooking + "\n" + "preemptionDisabled = " + getPreemptionDisabled() + "\n" + "defaultAppPriorityPerQueue = " - + defaultAppPriorityPerQueue + "\npriority = " + priority); + + defaultAppPriorityPerQueue + "\npriority = " + priority + + "\nmaxLifetime = " + maxLifetime + " seconds" + + "\ndefaultLifetime = " + defaultLifetime + " seconds"); } finally { writeLock.unlock(); } @@ -2086,4 +2100,12 @@ public void stopQueue() { this.userLimit = userLimit; } } + + public long getMaximumApplicationLifetime() { + return maxLifetime; + } + + public long getDefaultApplicationLifetime() { + return defaultLifetime; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java index f7e76bb..0f0ad73 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -48,8 +49,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestWorkPreservingRMRestart; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.util.Times; import org.apache.log4j.Level; import org.apache.log4j.LogManager; @@ -67,6 +73,9 @@ @Before public void setup() throws IOException { conf = new YarnConfiguration(); + // Always run for CS, since other scheduler do not support this. + conf.setClass(YarnConfiguration.RM_SCHEDULER, + CapacityScheduler.class, ResourceScheduler.class); Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); UserGroupInformation.setConfiguration(conf); @@ -78,8 +87,15 @@ public void setup() throws IOException { public void testApplicationLifetimeMonitor() throws Exception { MockRM rm = null; try { + long maxLifetime = 30L; + long defaultLifetime = 15L; + + YarnConfiguration newConf = + new YarnConfiguration(setUpCSQueue(maxLifetime, defaultLifetime)); + conf = new YarnConfiguration(newConf); rm = new MockRM(conf); rm.start(); + Priority appPriority = Priority.newInstance(0); MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * 1024); @@ -91,6 +107,13 @@ public void testApplicationLifetimeMonitor() throws Exception { // 20L seconds timeouts.put(ApplicationTimeoutType.LIFETIME, 20L); RMApp app2 = rm.submitApp(1024, appPriority, timeouts); + + // user not set lifetime, so queue max lifetime will be considered. + RMApp app3 = rm.submitApp(1024, appPriority, Collections.emptyMap()); + + // asc lifetime exceeds queue max lifetime + timeouts.put(ApplicationTimeoutType.LIFETIME, 40L); + RMApp app4 = rm.submitApp(1024, appPriority, timeouts); nm1.nodeHeartbeat(true); // Send launch Event @@ -103,8 +126,9 @@ public void testApplicationLifetimeMonitor() throws Exception { Map updateTimeout = new HashMap(); - long newLifetime = 10L; - // update 10L seconds more to timeout + long newLifetime = 40L; + // update 30L seconds more to timeout which is greater than queue max + // lifetime String formatISO8601 = Times.formatISO8601(System.currentTimeMillis() + newLifetime * 1000); updateTimeout.put(ApplicationTimeoutType.LIFETIME, formatISO8601); @@ -142,8 +166,6 @@ public void testApplicationLifetimeMonitor() throws Exception { !appTimeouts.isEmpty()); ApplicationTimeout timeout = appTimeouts.get(ApplicationTimeoutType.LIFETIME); - Assert.assertEquals("Application timeout string is incorrect.", - formatISO8601, timeout.getExpiryTime()); Assert.assertTrue("Application remaining time is incorrect", timeout.getRemainingTime() > 0); @@ -151,7 +173,18 @@ public void testApplicationLifetimeMonitor() throws Exception { // verify for app killed with updated lifetime Assert.assertTrue("Application killed before lifetime value", app2.getFinishTime() > afterUpdate); + + rm.waitForState(app3.getApplicationId(), RMAppState.KILLED); + // app4 submitted exceeding queue max lifetime, so killed after queue max + // lifetime. + rm.waitForState(app4.getApplicationId(), RMAppState.KILLED); + long totalTimeRun = (app4.getFinishTime() - app4.getSubmitTime()) / 1000; + Assert.assertTrue("Application killed before lifetime value", + totalTimeRun > maxLifetime); + Assert.assertTrue( + "Application killed before lifetime value " + totalTimeRun, + totalTimeRun < maxLifetime + 10L); } finally { stopRM(rm); } @@ -172,7 +205,7 @@ public void testApplicationLifetimeOnRMRestart() throws Exception { nm1.registerNode(); nm1.nodeHeartbeat(true); - long appLifetime = 60L; + long appLifetime = 30L; Map timeouts = new HashMap(); timeouts.put(ApplicationTimeoutType.LIFETIME, appLifetime); @@ -304,6 +337,21 @@ public synchronized void updateApplicationStateInternal( stopRM(rm1); } } + + private CapacitySchedulerConfiguration setUpCSQueue(long maxLifetime, + long defaultLifetime) { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "default" }); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".default", 100); + csConf.setMaximumLifetimePerQueue( + CapacitySchedulerConfiguration.ROOT + ".default", maxLifetime); + csConf.setDefaultLifetimePerQueue( + CapacitySchedulerConfiguration.ROOT + ".default", defaultLifetime); + + return csConf; + } private void stopRM(MockRM rm) { if (rm != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index a526222..296144b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -4909,6 +4909,78 @@ public void testAMLimitDouble() throws Exception { rm.stop(); } + @Test(timeout = 30000) + public void testcheckAndGetApplicationLifetime() throws Exception { + long maxLifetime = 10; + long defaultLifetime = 5; + // positive integer value + CapacityScheduler cs = setUpCSQueue(maxLifetime, defaultLifetime); + Assert.assertEquals(maxLifetime, + cs.checkAndGetApplicationLifetime("default", 100)); + Assert.assertEquals(9, cs.checkAndGetApplicationLifetime("default", 9)); + Assert.assertEquals(defaultLifetime, + cs.checkAndGetApplicationLifetime("default", -1)); + Assert.assertEquals(defaultLifetime, cs.checkAndGetApplicationLifetime("default", 0)); + + maxLifetime = -1; + defaultLifetime = -1; + // test for default values + cs = setUpCSQueue(maxLifetime, defaultLifetime); + Assert.assertEquals(100, cs.checkAndGetApplicationLifetime("default", 100)); + Assert.assertEquals(defaultLifetime, + cs.checkAndGetApplicationLifetime("default", -1)); + Assert.assertEquals(0, + cs.checkAndGetApplicationLifetime("default", 0)); + + maxLifetime = 10; + defaultLifetime = 10; + cs = setUpCSQueue(maxLifetime, defaultLifetime); + Assert.assertEquals(maxLifetime, cs.checkAndGetApplicationLifetime("default", 100)); + Assert.assertEquals(defaultLifetime, cs.checkAndGetApplicationLifetime("default", -1)); + Assert.assertEquals(defaultLifetime, cs.checkAndGetApplicationLifetime("default", 0)); + + maxLifetime = 0; + defaultLifetime = 0; + cs = setUpCSQueue(maxLifetime, defaultLifetime); + Assert.assertEquals(100, cs.checkAndGetApplicationLifetime("default", 100)); + Assert.assertEquals(-1, + cs.checkAndGetApplicationLifetime("default", -1)); + Assert.assertEquals(0, + cs.checkAndGetApplicationLifetime("default", 0)); + + maxLifetime = 5; + defaultLifetime = 10; + try { + setUpCSQueue(maxLifetime, defaultLifetime); + Assert.fail("Expected to fails since maxLifetime < defaultLifetime."); + } catch (YarnRuntimeException ye) { + Assert.assertTrue( + ye.getMessage().contains("cann't exceed maximum lifetime")); + } + } + + private CapacityScheduler setUpCSQueue(long maxLifetime, long defaultLifetime) { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "default" }); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".default", 100); + csConf.setMaximumLifetimePerQueue( + CapacitySchedulerConfiguration.ROOT + ".default", maxLifetime); + csConf.setDefaultLifetimePerQueue( + CapacitySchedulerConfiguration.ROOT + ".default", defaultLifetime); + + YarnConfiguration conf = new YarnConfiguration(csConf); + CapacityScheduler cs = new CapacityScheduler(); + + RMContext rmContext = TestUtils.getMockRMContext(); + cs.setConf(conf); + cs.setRMContext(rmContext); + cs.init(conf); + + return cs; + } + private void waitforNMRegistered(ResourceScheduler scheduler, int nodecount, int timesec) throws InterruptedException { long start = System.currentTimeMillis(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md index f1d4535..ff1ef80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md @@ -171,6 +171,16 @@ Example: ``` + * Queue lifetime for applications + + The `CapacityScheduler` supports the following parameters to lifetime of an application: + +| Property | Description | +|:---- |:---- | +| `yarn.scheduler.capacity..maximum-application-lifetime` | Maximum lifetime of an applications submitted to a queue in seconds. Any value less than or equal to zero is considered as disabled. It is hard limit for any applications submitting in this queue. If positive value is configured then any application submitted to this queue will be killed after exceeds configured lifetime. User can also specify lifetime per application basis in application submission context. But user lifetime will be overridden if it exceeds queue maximum lifetime. It is *point-in-time* configuration. *Note* : Configuring too low value will result it killing application sooner.. | +| `yarn.scheduler.capacity.root..default-application-lifetime` | Default lifetime of an applications submitted to a queue in seconds. Any value less than or equal to zero is considered as disabled. If the user is not submitted with lifetime value then this value will be taken. It is *point-in-time* configuration. *Note* : Default lifetime cann't exceeds maximum lifetime!. | + + ###Setup for application priority. Application priority works only along with FIFO ordering policy. Default ordering policy is FIFO.