From 4551f39f0600cbae571dc440f71976a01142c6b2 Mon Sep 17 00:00:00 2001 From: Rohith Sharma K S Date: Tue, 27 Jun 2017 16:05:57 -0700 Subject: [PATCH] YARN-6600 --- .../hadoop/yarn/api/ApplicationClientProtocol.java | 2 +- .../UpdateApplicationTimeoutsResponse.java | 19 ++++ .../hadoop/yarn/client/cli/ApplicationCLI.java | 16 ++- .../apache/hadoop/yarn/client/cli/TestYarnCLI.java | 22 ++--- .../UpdateApplicationTimeoutsResponsePBImpl.java | 109 +++++++++++++++++++++ .../conf/capacity-scheduler.xml | 15 +++ .../server/resourcemanager/ClientRMService.java | 4 +- .../yarn/server/resourcemanager/RMAppManager.java | 30 +++++- .../server/resourcemanager/rmapp/RMAppImpl.java | 14 +++ .../scheduler/AbstractYarnScheduler.java | 6 ++ .../resourcemanager/scheduler/YarnScheduler.java | 2 + .../scheduler/capacity/CapacityScheduler.java | 11 +++ .../capacity/CapacitySchedulerConfiguration.java | 8 ++ .../scheduler/capacity/LeafQueue.java | 12 ++- .../TestApplicationLifetimeMonitor.java | 56 +++++++++-- 15 files changed, 302 insertions(+), 24 deletions(-) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/{rmapp => }/TestApplicationLifetimeMonitor.java (85%) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java index 394454f..6d39366 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java @@ -578,7 +578,7 @@ SignalContainerResponse signalToContainer( * Note: If application timeout value is less than or equal to current * time then update application throws YarnException. * @param request to set ApplicationTimeouts of an application - * @return an empty response that the update has completed successfully. + * @return a response with updated timeouts. * @throws YarnException if update request has empty values or application is * in completing states. * @throws IOException on IO failures diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsResponse.java index bd02bb8..3770eb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsResponse.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.util.Records; /** @@ -43,4 +44,22 @@ public static UpdateApplicationTimeoutsResponse newInstance() { Records.newRecord(UpdateApplicationTimeoutsResponse.class); return response; } + + /** + * Get ApplicationTimeouts of the application. Timeout value is + * in ISO8601 standard with format yyyy-MM-dd'T'HH:mm:ss.SSSZ. + * @return all ApplicationTimeouts of the application. + */ + public abstract Map getApplicationTimeouts(); + + /** + * Set the ApplicationTimeouts for the application. Timeout value + * is absolute. Timeout value should meet ISO8601 format. Support ISO8601 + * format is yyyy-MM-dd'T'HH:mm:ss.SSSZ. All pre-existing Map entries + * are cleared before adding the new Map. + * @param applicationTimeouts ApplicationTimeoutss for the + * application + */ + public abstract void setApplicationTimeouts( + Map applicationTimeouts); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java index 893348a..ead4fc5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java @@ -41,6 +41,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -359,10 +360,21 @@ private void updateApplicationTimeout(String applicationId, + timeoutType.toString() + " of an application " + applicationId); UpdateApplicationTimeoutsRequest request = UpdateApplicationTimeoutsRequest .newInstance(appId, Collections.singletonMap(timeoutType, newTimeout)); - client.updateApplicationTimeouts(request); + UpdateApplicationTimeoutsResponse updateApplicationTimeouts = + client.updateApplicationTimeouts(request); + String updatedTimeout = + updateApplicationTimeouts.getApplicationTimeouts().get(timeoutType); + + if (timeoutType.equals(ApplicationTimeoutType.LIFETIME) + && !newTimeout.equals(updatedTimeout)) { + sysout.println("Updated lifetime of an application " + applicationId + + " to queue max lifetime" + ". New expiry time is " + + updatedTimeout); + return; + } sysout.println( "Successfully updated " + timeoutType.toString() + " of an application " - + applicationId + ". New expiry time is " + newTimeout); + + applicationId + ". New expiry time is " + updatedTimeout); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java index 3c35b9c..13730f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java @@ -48,6 +48,7 @@ import org.apache.commons.lang.time.DateFormatUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -2148,17 +2149,16 @@ public void testUpdateApplicationTimeout() throws Exception { ApplicationCLI cli = createAndGetAppCLI(); ApplicationId applicationId = ApplicationId.newInstance(1234, 6); - ApplicationReport appReport = ApplicationReport.newInstance(applicationId, - ApplicationAttemptId.newInstance(applicationId, 1), "user", "queue", - "appname", "host", 124, null, YarnApplicationState.RUNNING, - "diagnostics", "url", 0, 0, FinalApplicationStatus.UNDEFINED, null, - "N/A", 0.53789f, "YARN", null); - ApplicationTimeout timeout = ApplicationTimeout - .newInstance(ApplicationTimeoutType.LIFETIME, "N/A", -1); - appReport.setApplicationTimeouts( - Collections.singletonMap(timeout.getTimeoutType(), timeout)); - when(client.getApplicationReport(any(ApplicationId.class))) - .thenReturn(appReport); + UpdateApplicationTimeoutsResponse response = + mock(UpdateApplicationTimeoutsResponse.class); + String formatISO8601 = + Times.formatISO8601(System.currentTimeMillis() + 5 * 1000); + when(response.getApplicationTimeouts()).thenReturn(Collections + .singletonMap(ApplicationTimeoutType.LIFETIME, formatISO8601)); + + when(client + .updateApplicationTimeouts(any(UpdateApplicationTimeoutsRequest.class))) + .thenReturn(response); int result = cli.run(new String[] { "application", "-appId", applicationId.toString(), "-updateLifetime", "10" }); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsResponsePBImpl.java index 74f1715..52b4652 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsResponsePBImpl.java @@ -18,10 +18,21 @@ package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationUpdateTimeoutMapProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsRequestProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsResponseProtoOrBuilder; import com.google.protobuf.TextFormat; @@ -33,6 +44,7 @@ UpdateApplicationTimeoutsResponseProto.getDefaultInstance(); UpdateApplicationTimeoutsResponseProto.Builder builder = null; boolean viaProto = false; + private Map applicationTimeouts = null; public UpdateApplicationTimeoutsResponsePBImpl() { builder = UpdateApplicationTimeoutsResponseProto.newBuilder(); @@ -45,11 +57,33 @@ public UpdateApplicationTimeoutsResponsePBImpl( } public UpdateApplicationTimeoutsResponseProto getProto() { + mergeLocalToProto(); proto = viaProto ? proto : builder.build(); viaProto = true; return proto; } + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = UpdateApplicationTimeoutsResponseProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + if (this.applicationTimeouts != null) { + addApplicationTimeouts(); + } + } + @Override public int hashCode() { return getProto().hashCode(); @@ -70,4 +104,79 @@ public boolean equals(Object other) { public String toString() { return TextFormat.shortDebugString(getProto()); } + + @Override + public Map getApplicationTimeouts() { + initApplicationTimeout(); + return this.applicationTimeouts; + } + + private void initApplicationTimeout() { + if (this.applicationTimeouts != null) { + return; + } + UpdateApplicationTimeoutsResponseProtoOrBuilder p = + viaProto ? proto : builder; + List lists = + p.getApplicationTimeoutsList(); + this.applicationTimeouts = + new HashMap(lists.size()); + for (ApplicationUpdateTimeoutMapProto timeoutProto : lists) { + this.applicationTimeouts.put( + ProtoUtils + .convertFromProtoFormat(timeoutProto.getApplicationTimeoutType()), + timeoutProto.getExpireTime()); + } + } + + @Override + public void setApplicationTimeouts( + Map appTimeouts) { + if (appTimeouts == null) { + return; + } + initApplicationTimeout(); + this.applicationTimeouts.clear(); + this.applicationTimeouts.putAll(appTimeouts); + } + + private void addApplicationTimeouts() { + maybeInitBuilder(); + builder.clearApplicationTimeouts(); + if (applicationTimeouts == null) { + return; + } + Iterable values = + new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + private Iterator iterator = + applicationTimeouts.keySet().iterator(); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public ApplicationUpdateTimeoutMapProto next() { + ApplicationTimeoutType key = iterator.next(); + return ApplicationUpdateTimeoutMapProto.newBuilder() + .setExpireTime(applicationTimeouts.get(key)) + .setApplicationTimeoutType( + ProtoUtils.convertToProtoFormat(key)) + .build(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + this.builder.addAllApplicationTimeouts(values); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml index 785ed04..e8ce3d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml @@ -107,6 +107,21 @@ + yarn.scheduler.capacity.root.default.maximum-lifetime + -1 + + Maximum lifetime of an applications submitted to a queue + in seconds. It is hard limit for any applications submitting in this + queue. If positive value is configured then any application submitted + to this queue will be killed after exceeds configured lifetime. + User can also specify lifetime per application basis in + application submission context. But user lifetime will be + overridden if it exceeds queue maximum lifetime. + Note : Configuring too low value will result it killing application sooner. + + + + yarn.scheduler.capacity.node-locality-delay 40 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 8b28d65..248953a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -1698,7 +1698,8 @@ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( } try { - rmAppManager.updateApplicationTimeout(application, applicationTimeouts); + applicationTimeouts = rmAppManager.updateApplicationTimeout(application, + applicationTimeouts); } catch (YarnException ex) { RMAuditLogger.logFailure(callerUGI.getShortUserName(), AuditConstants.UPDATE_APP_TIMEOUTS, "UNKNOWN", "ClientRMService", @@ -1708,6 +1709,7 @@ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( RMAuditLogger.logSuccess(callerUGI.getShortUserName(), AuditConstants.UPDATE_APP_TIMEOUTS, "ClientRMService", applicationId); + response.setApplicationTimeouts(applicationTimeouts); return response; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 368832a..32dcbc7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.Times; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.SettableFuture; @@ -623,18 +624,41 @@ public void handle(RMAppManagerEvent event) { } // transaction method. - public void updateApplicationTimeout(RMApp app, + public Map updateApplicationTimeout(RMApp app, Map newTimeoutInISO8601Format) throws YarnException { ApplicationId applicationId = app.getApplicationId(); synchronized (applicationId) { if (app.isAppInCompletedStates()) { - return; + return newTimeoutInISO8601Format; } Map newExpireTime = RMServerUtils .validateISO8601AndConvertToLocalTimeEpoch(newTimeoutInISO8601Format); + // validation is only for lifetime + Long updatedlifetimeInMills = + newExpireTime.get(ApplicationTimeoutType.LIFETIME); + if (updatedlifetimeInMills != null) { + long queueMaxLifetimeInSec = + scheduler.getMaximumLifetime(app.getQueue()); + // queue maximum lifetime is disabled + if (queueMaxLifetimeInSec > 0) { + if (updatedlifetimeInMills > (app.getSubmitTime() + + queueMaxLifetimeInSec * 1000)) { + updatedlifetimeInMills = + app.getSubmitTime() + queueMaxLifetimeInSec * 1000; + // cut off to maximum queue lifetime if update lifetime is exceeding + // queue lifetime. + newExpireTime.put(ApplicationTimeoutType.LIFETIME, + updatedlifetimeInMills); + + newTimeoutInISO8601Format.put(ApplicationTimeoutType.LIFETIME, + Times.formatISO8601(updatedlifetimeInMills.longValue())); + } + } + } + SettableFuture future = SettableFuture.create(); Map currentExpireTimeouts = @@ -657,6 +681,8 @@ public void updateApplicationTimeout(RMApp app, // update in-memory ((RMAppImpl) app).updateApplicationTimeout(newExpireTime); + + return newTimeoutInISO8601Format; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index bf8fa4f..340bff3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1219,6 +1219,20 @@ public void transition(RMAppImpl app, RMAppEvent event) { long applicationLifetime = app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME); + long queueMaxLifetime = app.scheduler.getMaximumLifetime(app.queue); + // if queue max lifetime is set + if (queueMaxLifetime > 0) { + if (applicationLifetime > 0) { + // if app lifetime is greater than queue max lifetime then consider + // queue max lifetime + applicationLifetime = applicationLifetime > queueMaxLifetime + ? queueMaxLifetime : applicationLifetime; + } else { + // if app lifetime is disabled and queue max lifetime is set then + // consider queue max lifetime. + applicationLifetime = queueMaxLifetime; + } + } if (applicationLifetime > 0) { // calculate next timeout value Long newTimeout = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index c00b7be..9bb41ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -1262,4 +1262,10 @@ protected void rollbackContainerUpdate( public List getNodeIds(String resourceName) { return nodeTracker.getNodeIdsByResourceName(resourceName); } + + @Override + public long getMaximumLifetime(String queueName) { + // -1 indicates, lifetime is not configured. + return -1; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index 08e0603..da0355f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -385,4 +385,6 @@ public Priority updateApplicationPriority(Priority newPriority, * @return the normalized resource */ Resource getNormalizedResource(Resource requestedResource); + + long getMaximumLifetime(String queueName); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index d3186da..40445ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2484,4 +2484,15 @@ public boolean moveReservedContainer(RMContainer toBeMovedContainer, writeLock.unlock(); } } + + @Override + public long getMaximumLifetime(String queueName) { + CSQueue queue = getQueue(queueName); + if (queue == null || !(queue instanceof LeafQueue)) { + LOG.error("Unknown queue: " + queueName); + return -1; + } + // In seconds + return ((LeafQueue) queue).getMaximumLifetime(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 90a7e65..e0cf522 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1446,4 +1446,12 @@ public void setPUOrderingPolicyUnderUtilizedPreemptionMoveReservation( } return userWeights; } + + public static final String MAXIMUM_LIFETIME_SUFFIX = "maximum-lifetime"; + + public long getMaximumLifetimePerQueue(String queue) { + long maximumLifetimePerQueue = getLong( + getQueuePrefix(queue) + MAXIMUM_LIFETIME_SUFFIX, (long) UNDEFINED); + return maximumLifetimePerQueue; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 013a5ac..db24026 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -130,6 +130,9 @@ List priorityAcls = new ArrayList(); + // -1 indicates lifetime is disabled + private long maxLifetime = -1; + @SuppressWarnings({ "unchecked", "rawtypes" }) public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { @@ -237,6 +240,8 @@ protected void setupQueueConfigs(Resource clusterResource) defaultAppPriorityPerQueue = Priority.newInstance( conf.getDefaultApplicationPriorityConfPerQueue(getQueuePath())); + + maxLifetime = conf.getMaximumLifetimePerQueue((getQueuePath())); // Validate leaf queue's user's weights. int queueUL = Math.min(100, conf.getUserLimit(getQueuePath())); @@ -293,7 +298,8 @@ protected void setupQueueConfigs(Resource clusterResource) + "reservationsContinueLooking = " + reservationsContinueLooking + "\n" + "preemptionDisabled = " + getPreemptionDisabled() + "\n" + "defaultAppPriorityPerQueue = " - + defaultAppPriorityPerQueue + "\npriority = " + priority); + + defaultAppPriorityPerQueue + "\npriority = " + priority + + "\nmaxLifetime = " + maxLifetime + " second"); } finally { writeLock.unlock(); } @@ -2052,4 +2058,8 @@ public void stopQueue() { public Set getAllUsers() { return this.getUsersManager().getUsers().keySet(); } + + public long getMaximumLifetime() { + return maxLifetime; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationLifetimeMonitor.java similarity index 85% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationLifetimeMonitor.java index fdc47b9..884b0b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationLifetimeMonitor.java @@ -16,13 +16,15 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.server.resourcemanager.rmapp; +package org.apache.hadoop.yarn.server.resourcemanager; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -48,8 +50,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestWorkPreservingRMRestart; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.util.Times; import org.apache.log4j.Level; import org.apache.log4j.LogManager; @@ -57,6 +62,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; /** * Test class for application life time monitor feature test. @@ -74,12 +80,24 @@ public void setup() throws IOException { 3000L); } - @Test(timeout = 60000) + @Test +// @Test(timeout = 60000) public void testApplicationLifetimeMonitor() throws Exception { MockRM rm = null; + long queueMaxLifetime = 30L; + try { - rm = new MockRM(conf); + rm = new MockRM(conf) { + protected ResourceScheduler createScheduler() { + ResourceScheduler spyScheduler = Mockito.spy(super.createScheduler()); + Mockito.doReturn(queueMaxLifetime).when(spyScheduler) + .getMaximumLifetime(any(String.class)); + return spyScheduler; + } + }; + rm.start(); + Priority appPriority = Priority.newInstance(0); MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * 1024); @@ -92,6 +110,13 @@ public void testApplicationLifetimeMonitor() throws Exception { timeouts.put(ApplicationTimeoutType.LIFETIME, 20L); RMApp app2 = rm.submitApp(1024, appPriority, timeouts); + // user not set lifetime, so queue max lifetime will be considered. + RMApp app3 = rm.submitApp(1024, appPriority, Collections.emptyMap()); + + // asc lifetime exceeds queue max lifetime + timeouts.put(ApplicationTimeoutType.LIFETIME, 40L); + RMApp app4 = rm.submitApp(1024, appPriority, timeouts); + nm1.nodeHeartbeat(true); // Send launch Event MockAM am1 = @@ -103,8 +128,9 @@ public void testApplicationLifetimeMonitor() throws Exception { Map updateTimeout = new HashMap(); - long newLifetime = 10L; - // update 10L seconds more to timeout + long newLifetime = 30L; + // update 30L seconds more to timeout which is greater than queue max + // lifetime String formatISO8601 = Times.formatISO8601(System.currentTimeMillis() + newLifetime * 1000); updateTimeout.put(ApplicationTimeoutType.LIFETIME, formatISO8601); @@ -143,7 +169,10 @@ public void testApplicationLifetimeMonitor() throws Exception { ApplicationTimeout timeout = appTimeouts.get(ApplicationTimeoutType.LIFETIME); Assert.assertEquals("Application timeout string is incorrect.", - formatISO8601, timeout.getExpiryTime()); + Times.formatISO8601(app2.getSubmitTime() + + rm.getResourceScheduler().getMaximumLifetime(app2.getQueue()) + * 1000), + timeout.getExpiryTime()); Assert.assertTrue("Application remaining time is incorrect", timeout.getRemainingTime() > 0); @@ -152,6 +181,21 @@ public void testApplicationLifetimeMonitor() throws Exception { Assert.assertTrue("Application killed before lifetime value", app2.getFinishTime() > afterUpdate); + // app3 submitted without lifetime, but killed after queue max lifetime. + rm.waitForState(app3.getApplicationId(), RMAppState.KILLED); + Assert.assertTrue("Application killed before lifetime value", + (app3.getFinishTime() - app3.getSubmitTime()) + / 1000 > queueMaxLifetime); + + // app4 submitted exceeding queue max lifetime, so killed after queue max + // lifetime. + rm.waitForState(app4.getApplicationId(), RMAppState.KILLED); + long totalTimeRun = (app4.getFinishTime() - app4.getSubmitTime()) / 1000; + Assert.assertTrue("Application killed before lifetime value", + totalTimeRun > queueMaxLifetime); + Assert.assertTrue( + "Application killed before lifetime value " + totalTimeRun, + totalTimeRun < 40L); } finally { stopRM(rm); } -- 2.10.1 (Apple Git-78)