From 8b393991ab6c03bfd47e605580191044d2f1af12 Mon Sep 17 00:00:00 2001
From: Rohith Sharma K S
Date: Mon, 12 Sep 2016 18:16:36 +0530
Subject: [PATCH] YARN-4205
---
.../api/records/ApplicationSubmissionContext.java | 21 +++
.../yarn/api/records/ApplicationTimeouts.java | 100 +++++++++++++
.../apache/hadoop/yarn/conf/YarnConfiguration.java | 6 +
.../src/main/proto/yarn_protos.proto | 5 +
.../pb/ApplicationSubmissionContextPBImpl.java | 38 +++++
.../records/impl/pb/ApplicationTimeoutsPBImpl.java | 68 +++++++++
.../yarn/util/AbstractLivelinessMonitor.java | 30 ++--
.../src/main/resources/yarn-default.xml | 9 ++
.../apache/hadoop/yarn/api/TestPBImplRecords.java | 10 ++
.../resourcemanager/RMActiveServiceContext.java | 16 +++
.../yarn/server/resourcemanager/RMContext.java | 5 +
.../yarn/server/resourcemanager/RMContextImpl.java | 12 ++
.../server/resourcemanager/ResourceManager.java | 9 ++
.../server/resourcemanager/rmapp/RMAppImpl.java | 39 ++++-
.../rmapp/RMAppLifetimeMonitor.java | 113 +++++++++++++++
.../hadoop/yarn/server/resourcemanager/MockRM.java | 24 +++-
.../rmapp/TestApplicationLifetimeMonitor.java | 160 +++++++++++++++++++++
17 files changed, 651 insertions(+), 14 deletions(-)
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationTimeouts.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationTimeoutsPBImpl.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLifetimeMonitor.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
index 21cd1bb..82e3f99 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
@@ -535,4 +535,25 @@ public abstract void setLogAggregationContext(
@Public
@Unstable
public abstract void setReservationID(ReservationId reservationID);
+
+ /**
+ * Get the application lifetime value.
+ * The application will be killed if is not completed in the given time.
+ *
+ * @return application lifetime value.
+ */
+ @Public
+ @Unstable
+ public abstract ApplicationTimeouts getApplicationTimeouts();
+
+ /**
+ * Set the application lifetime value. The application will be killed if is
+ * not completed in the given time.
+ * Note : Do not configure very less value.
+ *
+ * @param applicationLifetime application lifetime value in seconds
+ */
+ @Public
+ @Unstable
+ public abstract void setApplicationTimeouts(ApplicationTimeouts appTimeouts);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationTimeouts.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationTimeouts.java
new file mode 100644
index 0000000..b3257cc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationTimeouts.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * {@code ApplicationTimeouts} represents user specified timeouts on
+ * application. Timeouts can be life_timeout, queue_timeout, statestore_timeout.
+ * Currently YARN supports life_timeout only.
+ *
+ * life_timeout : Timeout imposed on overall application execution time. If
+ * this is set, then timeout monitoring start from application submissions time.
+ *
+ */
+@Public
+@Unstable
+public abstract class ApplicationTimeouts
+ implements Comparable {
+
+ @Public
+ @Unstable
+ public static ApplicationTimeouts newInstance(long lifetime) {
+ ApplicationTimeouts timeouts = Records.newRecord(ApplicationTimeouts.class);
+ timeouts.setLifetimeout(lifetime);
+ return timeouts;
+ }
+
+ /**
+ * Get the application lifetime value. The application will be killed if is
+ * not completed in the given time.
+ * @return application lifetime value in seconds.
+ */
+ @Public
+ @Unstable
+ public abstract long getLifetimeout();
+
+ /**
+ * Set the application lifetime value. The application will be killed if is
+ * not completed in the given time.
+ * Note : Do not configure very less value.
+ *
+ * @param Lifetime application lifetime value in seconds
+ */
+ @Public
+ @Unstable
+ public abstract void setLifetimeout(long Lifetime);
+
+ @Override
+ public int compareTo(ApplicationTimeouts other) {
+ long diff = this.getLifetimeout() - other.getLifetimeout();
+ return diff == 0 ? 0 : (diff > 0 ? 1 : -1);
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 517861;
+ int result = 9511;
+ result = (int) (prime * result + getLifetimeout());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ ApplicationTimeouts other = (ApplicationTimeouts) obj;
+ if (getLifetimeout() != other.getLifetimeout())
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "ApplicationTimeouts: lifetimeout=" + getLifetimeout();
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 86e8a95..188da6b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1541,6 +1541,12 @@ public static boolean isAclEnabled(Configuration conf) {
false;
+ // Configurations for applicaiton life time monitor feature
+ public static final String RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS =
+ RM_PREFIX + "application.lifetimeout-monitor.interval-ms";
+
+ public static final long DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS =
+ 60000;
/**
* Interval of time the linux container executor should try cleaning up
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 2d6007e..b0eba49 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -365,6 +365,11 @@ message ApplicationSubmissionContextProto {
optional ReservationIdProto reservation_id = 15;
optional string node_label_expression = 16;
optional ResourceRequestProto am_container_resource_request = 17;
+ optional ApplicationTimeoutsProto application_timeouts = 18;
+}
+
+message ApplicationTimeoutsProto {
+ optional int64 life_timeout = 1 [default = -1];
}
message LogAggregationContextProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
index 67e3a84..5b8b763 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ApplicationTimeouts;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -36,6 +37,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutsProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
@@ -63,6 +65,7 @@
private ResourceRequest amResourceRequest = null;
private LogAggregationContext logAggregationContext = null;
private ReservationId reservationId = null;
+ private ApplicationTimeouts applicationTimeouts = null;
public ApplicationSubmissionContextPBImpl() {
builder = ApplicationSubmissionContextProto.newBuilder();
@@ -131,6 +134,10 @@ private void mergeLocalToBuilder() {
if (this.reservationId != null) {
builder.setReservationId(convertToProtoFormat(this.reservationId));
}
+ if (this.applicationTimeouts != null) {
+ builder.setApplicationTimeouts(
+ convertToProtoFormat(this.applicationTimeouts));
+ }
}
private void mergeLocalToProto() {
@@ -548,4 +555,35 @@ private ReservationIdPBImpl convertFromProtoFormat(ReservationIdProto p) {
private ReservationIdProto convertToProtoFormat(ReservationId t) {
return ((ReservationIdPBImpl) t).getProto();
}
+
+ @Override
+ public ApplicationTimeouts getApplicationTimeouts() {
+ ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
+
+ if (this.applicationTimeouts != null) {
+ return this.applicationTimeouts;
+ } // Else via proto
+ if (!p.hasApplicationTimeouts()) {
+ return null;
+ }
+ applicationTimeouts = convertFromProtoFormat(p.getApplicationTimeouts());
+ return applicationTimeouts;
+ }
+
+ @Override
+ public void setApplicationTimeouts(ApplicationTimeouts appTimeouts) {
+ maybeInitBuilder();
+ if (applicationTimeouts == null)
+ builder.clearApplicationTimeouts();
+ this.applicationTimeouts = appTimeouts;
+ }
+
+ private ApplicationTimeoutsPBImpl convertFromProtoFormat(
+ ApplicationTimeoutsProto p) {
+ return new ApplicationTimeoutsPBImpl(p);
+ }
+
+ private ApplicationTimeoutsProto convertToProtoFormat(ApplicationTimeouts t) {
+ return ((ApplicationTimeoutsPBImpl) t).getProto();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationTimeoutsPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationTimeoutsPBImpl.java
new file mode 100644
index 0000000..5b34c20
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationTimeoutsPBImpl.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationTimeouts;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutsProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutsProtoOrBuilder;
+
+@Private
+@Unstable
+public class ApplicationTimeoutsPBImpl extends ApplicationTimeouts {
+ ApplicationTimeoutsProto proto =
+ ApplicationTimeoutsProto.getDefaultInstance();
+ ApplicationTimeoutsProto.Builder builder = null;
+ boolean viaProto = false;
+
+ public ApplicationTimeoutsPBImpl() {
+ builder = ApplicationTimeoutsProto.newBuilder();
+ }
+
+ public ApplicationTimeoutsPBImpl(ApplicationTimeoutsProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public ApplicationTimeoutsProto getProto() {
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = ApplicationTimeoutsProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public long getLifetimeout() {
+ ApplicationTimeoutsProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getLifeTimeout();
+ }
+
+ @Override
+ public void setLifetimeout(long lifetimeout) {
+ maybeInitBuilder();
+ builder.setLifeTimeout(lifetimeout);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java
index e80d032..f4e8443 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java
@@ -44,8 +44,8 @@
private Thread checkerThread;
private volatile boolean stopped;
public static final int DEFAULT_EXPIRE = 5*60*1000;//5 mins
- private int expireInterval = DEFAULT_EXPIRE;
- private int monitorInterval = expireInterval/3;
+ private long expireInterval = DEFAULT_EXPIRE;
+ private long monitorInterval = expireInterval / 3;
private final Clock clock;
@@ -85,7 +85,12 @@ protected void setExpireInterval(int expireInterval) {
this.expireInterval = expireInterval;
}
- protected void setMonitorInterval(int monitorInterval) {
+ protected long getExpireInterval(O o) {
+ // by-default return for all the registered object interval.
+ return this.expireInterval;
+ }
+
+ protected void setMonitorInterval(long monitorInterval) {
this.monitorInterval = monitorInterval;
}
@@ -97,7 +102,11 @@ public synchronized void receivedPing(O ob) {
}
public synchronized void register(O ob) {
- running.put(ob, clock.getTime());
+ register(ob, clock.getTime());
+ }
+
+ public synchronized void register(O ob, long timeout) {
+ running.put(ob, timeout);
}
public synchronized void unregister(O ob) {
@@ -117,19 +126,20 @@ public synchronized void resetTimer() {
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
synchronized (AbstractLivelinessMonitor.this) {
- Iterator> iterator =
- running.entrySet().iterator();
+ Iterator> iterator = running.entrySet().iterator();
- //avoid calculating current time everytime in loop
+ // avoid calculating current time everytime in loop
long currentTime = clock.getTime();
while (iterator.hasNext()) {
Map.Entry entry = iterator.next();
+ O key = entry.getKey();
+ long expireInterval = getExpireInterval(key);
if (currentTime > entry.getValue() + expireInterval) {
iterator.remove();
- expire(entry.getKey());
- LOG.info("Expired:" + entry.getKey().toString() +
- " Timed out after " + expireInterval/1000 + " secs");
+ expire(key);
+ LOG.info("Expired:" + entry.getKey().toString()
+ + " Timed out after " + expireInterval / 1000 + " secs");
}
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 423b78b..9e437da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3082,4 +3082,13 @@
yarn.resourcemanager.node-removal-untracked.timeout-ms
60000
+
+
+
+ The RMAppLifeTimeMonitorService uses this value as monitor interval.
+
+ yarn.resourcemanager.application.lifetimeout-monitor.interval-ms
+ 60000
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
index 11bf56b..7b0645d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
@@ -117,6 +117,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ApplicationTimeouts;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -164,6 +165,7 @@
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationReportPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationResourceUsageReportPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationTimeoutsPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
@@ -199,6 +201,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutsProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
@@ -469,6 +472,7 @@ public static void setup() throws Exception {
SerializedException.newInstance(new IOException("exception for test")));
generateByNewInstance(ExecutionTypeRequest.class);
generateByNewInstance(LogAggregationContext.class);
+ generateByNewInstance(ApplicationTimeouts.class);
generateByNewInstance(ApplicationId.class);
generateByNewInstance(ApplicationAttemptId.class);
generateByNewInstance(ContainerId.class);
@@ -1377,4 +1381,10 @@ public void testExecutionTypeRequestPBImpl() throws Exception {
validatePBImplRecord(ExecutionTypeRequestPBImpl.class,
ExecutionTypeRequestProto.class);
}
+
+ @Test
+ public void testApplicationTimeoutsPBImpl() throws Exception {
+ validatePBImplRecord(ApplicationTimeoutsPBImpl.class,
+ ApplicationTimeoutsProto.class);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
index caa0ff13..0bf5f21 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -37,6 +37,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppLifetimeMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -105,6 +106,8 @@
private boolean isSchedulerReady = false;
private PlacementManager queuePlacementManager = null;
+ private RMAppLifetimeMonitor rmAppLifetimeMonitor;
+
public RMActiveServiceContext() {
queuePlacementManager = new PlacementManager();
}
@@ -467,4 +470,17 @@ public PlacementManager getQueuePlacementManager() {
public void setQueuePlacementManager(PlacementManager placementMgr) {
this.queuePlacementManager = placementMgr;
}
+
+ @Private
+ @Unstable
+ public void setRMAppLifetimeMonitor(
+ RMAppLifetimeMonitor rmAppLifetimeMonitor) {
+ this.rmAppLifetimeMonitor = rmAppLifetimeMonitor;
+ }
+
+ @Private
+ @Unstable
+ public RMAppLifetimeMonitor getRMAppLifetimeMonitor() {
+ return this.rmAppLifetimeMonitor;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index 2ba445c..f313a52 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppLifetimeMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -149,4 +150,8 @@ void setRMDelegatedNodeLabelsUpdater(
LeaderElectorService getLeaderElectorService();
QueueLimitCalculator getNodeManagerQueueLimitCalculator();
+
+ void setRMAppLifetimeMonitor(RMAppLifetimeMonitor rmAppLifetimeMonitor);
+
+ RMAppLifetimeMonitor getRMAppLifetimeMonitor();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 1e702de..b6d4fb8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -39,6 +39,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppLifetimeMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -499,4 +500,15 @@ public void setContainerQueueLimitCalculator(
QueueLimitCalculator limitCalculator) {
this.queueLimitCalculator = limitCalculator;
}
+
+ @Override
+ public void setRMAppLifetimeMonitor(
+ RMAppLifetimeMonitor rmAppLifeTimeMonitor) {
+ this.activeServiceContext.setRMAppLifetimeMonitor(rmAppLifeTimeMonitor);
+ }
+
+ @Override
+ public RMAppLifetimeMonitor getRMAppLifetimeMonitor() {
+ return this.activeServiceContext.getRMAppLifetimeMonitor();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index bf72fc1..d1c3e7c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -92,6 +92,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppLifetimeMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
@@ -556,6 +557,10 @@ protected void serviceInit(Configuration configuration) throws Exception {
addService(amFinishingMonitor);
rmContext.setAMFinishingMonitor(amFinishingMonitor);
+ RMAppLifetimeMonitor rmAppLifetimeMonitor = createRMAppLifetimeMonitor();
+ addService(rmAppLifetimeMonitor);
+ rmContext.setRMAppLifetimeMonitor(rmAppLifetimeMonitor);
+
RMNodeLabelsManager nlm = createNodeLabelManager();
nlm.setRMContext(rmContext);
addService(nlm);
@@ -1398,4 +1403,8 @@ private static void printUsage(PrintStream out) {
out.println(" "
+ "[-remove-application-from-state-store ]" + "\n");
}
+
+ protected RMAppLifetimeMonitor createRMAppLifetimeMonitor() {
+ return new RMAppLifetimeMonitor(this.rmContext);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index e5bde32..5ba56bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -56,6 +56,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ApplicationTimeouts;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -1106,6 +1107,18 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
}
}
+ long applicationLifetime = getApplicationLifetime(app);
+ if (applicationLifetime > 0) {
+ app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId,
+ app.submitTime, applicationLifetime * 1000);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Application " + app.applicationId
+ + " is registered with Application lifetime monitor after recovery. "
+ + "The lifetime configured is " + applicationLifetime
+ + " seconds");
+ }
+ }
+
// No existent attempts means the attempt associated with this app was not
// started or started but not yet saved.
if (app.attempts.isEmpty()) {
@@ -1152,6 +1165,12 @@ public void transition(RMAppImpl app, RMAppEvent event) {
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
+ long applicationLifetime = getApplicationLifetime(app);
+ if (applicationLifetime > 0) {
+ app.rmContext.getRMAppLifetimeMonitor()
+ .unregisterApp(app.applicationId);
+ }
+
if (app.transitionTodo instanceof SingleArcTransition) {
((SingleArcTransition) app.transitionTodo).transition(app,
app.eventCausingFinalSaving);
@@ -1160,7 +1179,6 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
app.eventCausingFinalSaving);
}
return app.targetedFinalState;
-
}
}
@@ -1209,6 +1227,15 @@ private String getAppAttemptFailedDiagnostics(RMAppEvent event) {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
+ long applicationLifetime = getApplicationLifetime(app);
+ if (applicationLifetime > 0) {
+ app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId,
+ app.submitTime, applicationLifetime * 1000);
+ LOG.info("Application " + app.applicationId
+ + " is registered with Application lifetime monitor. "
+ + "The lifetime configured is " + applicationLifetime + " seconds");
+ }
+
// If recovery is enabled then store the application information in a
// non-blocking call so make sure that RM has stored the information
// needed to restart the AM after RM restart without further client
@@ -1922,4 +1949,14 @@ private void sendATSCreateEvent() {
public int getNextAttemptId() {
return nextAttemptId;
}
+
+ private static long getApplicationLifetime(RMApp app) {
+ ApplicationTimeouts applicationTimeouts =
+ app.getApplicationSubmissionContext().getApplicationTimeouts();
+ long applicationLifetime = -1;
+ if (applicationTimeouts != null) {
+ applicationLifetime = applicationTimeouts.getLifetimeout();
+ }
+ return applicationLifetime;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLifetimeMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLifetimeMonitor.java
new file mode 100644
index 0000000..0a94c80
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLifetimeMonitor.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import java.util.EnumSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
+import org.apache.hadoop.yarn.util.SystemClock;
+
+/**
+ * This service will monitor the applications against the lifetime value given.
+ * The applications will be killed if it running beyond the given time.
+ */
+public class RMAppLifetimeMonitor
+ extends AbstractLivelinessMonitor {
+
+ private static final Log LOG = LogFactory.getLog(RMAppLifetimeMonitor.class);
+
+ private RMContext rmContext;
+ private ConcurrentMap monitoredApps =
+ new ConcurrentHashMap();
+
+ private static final EnumSet COMPLETED_APP_STATES =
+ EnumSet.of(RMAppState.FINISHED, RMAppState.FINISHING, RMAppState.FAILED,
+ RMAppState.KILLED, RMAppState.FINAL_SAVING, RMAppState.KILLING);
+
+ public RMAppLifetimeMonitor(RMContext rmContext) {
+ super(RMAppLifetimeMonitor.class.getName(), SystemClock.getInstance());
+ this.rmContext = rmContext;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ long monitorInterval = conf.getLong(
+ YarnConfiguration.RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS);
+ if (monitorInterval <= 0) {
+ monitorInterval =
+ YarnConfiguration.DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS;
+ }
+ setMonitorInterval(monitorInterval);
+ LOG.info("Application lifelime monitor interval set to " + monitorInterval
+ + " ms.");
+ super.serviceInit(conf);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void expire(ApplicationId appId) {
+ Long remove = monitoredApps.remove(appId);
+ RMApp app = rmContext.getRMApps().get(appId);
+ if (app == null) {
+ return;
+ }
+ // Don't trigger an KILL event if application is in completed states
+ if (!COMPLETED_APP_STATES.contains(app.getState())) {
+ String diagnosis =
+ "Application killed due to exceeding its lifetime period " + remove
+ + " milliseconds";
+ rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppEvent(appId, RMAppEventType.KILL, diagnosis));
+ } else {
+ LOG.info("Application " + appId
+ + " is about to complete. So not killing the application.");
+ }
+ }
+
+ public void registerApp(ApplicationId appId, long submitTime,
+ long applicationLifetime) {
+ register(appId, submitTime);
+ monitoredApps.putIfAbsent(appId, applicationLifetime);
+ }
+
+ @Override
+ protected long getExpireInterval(ApplicationId appId) {
+ return monitoredApps.get(appId);
+ }
+
+ public void unregisterApp(ApplicationId appId) {
+ unregister(appId);
+ monitoredApps.remove(appId);
+ }
+
+ public void updateApplicationLifetime(ApplicationId appId,
+ long newAppLifetime) {
+ // only update for the registered objects
+ monitoredApps.replace(appId, newAppLifetime);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index f843261..832d94d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -50,6 +50,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ApplicationTimeouts;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -460,7 +461,7 @@ public RMApp submitApp(int masterMemory, String name, String user,
return submitApp(resource, name, user, acls, false, queue,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
- false, null, 0, null, true, priority, amLabel);
+ false, null, 0, null, true, priority, amLabel, -1);
}
public RMApp submitApp(Resource resource, String name, String user,
@@ -561,7 +562,7 @@ public RMApp submitApp(Resource capability, String name, String user,
return submitApp(capability, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
isAppIdProvided, applicationId, attemptFailuresValidityInterval,
- logAggregationContext, cancelTokensWhenComplete, priority, "");
+ logAggregationContext, cancelTokensWhenComplete, priority, "", -1);
}
public RMApp submitApp(Resource capability, String name, String user,
@@ -570,7 +571,8 @@ public RMApp submitApp(Resource capability, String name, String user,
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
ApplicationId applicationId, long attemptFailuresValidityInterval,
LogAggregationContext logAggregationContext,
- boolean cancelTokensWhenComplete, Priority priority, String amLabel)
+ boolean cancelTokensWhenComplete, Priority priority, String amLabel,
+ long applicationLifetime)
throws Exception {
ApplicationId appId = isAppIdProvided ? applicationId : null;
ApplicationClientProtocol client = getClientRMService();
@@ -587,6 +589,12 @@ public RMApp submitApp(Resource capability, String name, String user,
sub.setApplicationId(appId);
sub.setApplicationName(name);
sub.setMaxAppAttempts(maxAppAttempts);
+ if (-1 != applicationLifetime) {
+ ApplicationTimeouts timeouts =
+ Records.newRecord(ApplicationTimeouts.class);
+ timeouts.setLifetimeout(applicationLifetime);
+ sub.setApplicationTimeouts(timeouts);
+ }
if (unmanaged) {
sub.setUnmanagedAM(true);
}
@@ -1073,4 +1081,14 @@ public void waitForAppRemovedFromScheduler(ApplicationId appId,
!apps.containsKey(appId));
LOG.info("app is removed from scheduler, " + appId);
}
+ public RMApp submitApp(int masterMemory, Priority priority,
+ long applicationLifeTime) throws Exception {
+ Resource resource = Resource.newInstance(masterMemory, 0);
+ return submitApp(
+ resource,"",UserGroupInformation.getCurrentUser().getShortUserName(),
+ null,false,null,
+ super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true,
+ false, false, null, 0, null, true, priority, null, applicationLifeTime);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java
new file mode 100644
index 0000000..129a856
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Set;
+
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart;
+import org.apache.hadoop.yarn.server.resourcemanager.TestWorkPreservingRMRestart;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test class for application life time monitor feature test.
+ */
+public class TestApplicationLifetimeMonitor {
+ private final int GB = 1024;
+
+ private YarnConfiguration conf;
+
+ @Before
+ public void setup() throws IOException {
+ conf = new YarnConfiguration();
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.DEBUG);
+ UserGroupInformation.setConfiguration(conf);
+ conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
+ true);
+ conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+ conf.setLong(YarnConfiguration.RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS,
+ 3000L);
+ }
+
+ @Test(timeout = 90000)
+ public void testApplicationLifeTimeMonitor() throws Exception {
+ MockRM rm = null;
+ try {
+ rm = new MockRM(conf);
+ rm.start();
+ Priority appPriority = Priority.newInstance(0);
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * GB);
+ RMApp app1 = rm.submitApp(1 * GB, appPriority, 10);
+ nm1.nodeHeartbeat(true);
+ // Send launch Event
+ MockAM am1 =
+ rm.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId());
+ am1.registerAppAttempt();
+ rm.waitForState(app1.getApplicationId(), RMAppState.KILLED);
+ Assert.assertTrue("Applicaiton killed before life timeout value",
+ (System.currentTimeMillis() - app1.getSubmitTime()) > 10000);
+ } finally {
+ stopRM(rm);
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test(timeout = 180000)
+ public void testApplicationLifeTimeOnRMRestart() throws Exception {
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ nm1.nodeHeartbeat(true);
+
+ long appLifetime = 60l;
+ RMApp app1 = rm1.submitApp(200, Priority.newInstance(0), appLifetime);
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // Re-start RM
+ MockRM rm2 = new MockRM(conf, memStore);
+ rm2.start();
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+
+ // recover app
+ RMApp recoveredApp1 =
+ rm2.getRMContext().getRMApps().get(app1.getApplicationId());
+
+ NMContainerStatus amContainer = TestRMRestart.createNMContainerStatus(
+ am1.getApplicationAttemptId(), 1, ContainerState.RUNNING);
+ NMContainerStatus runningContainer = TestRMRestart.createNMContainerStatus(
+ am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
+
+ nm1.registerNode(Arrays.asList(amContainer, runningContainer), null);
+
+ // Wait for RM to settle down on recovering containers;
+ TestWorkPreservingRMRestart.waitForNumContainersToRecover(2, rm2,
+ am1.getApplicationAttemptId());
+ Set launchedContainers =
+ ((RMNodeImpl) rm2.getRMContext().getRMNodes().get(nm1.getNodeId()))
+ .getLaunchedContainers();
+ assertTrue(launchedContainers.contains(amContainer.getContainerId()));
+ assertTrue(launchedContainers.contains(runningContainer.getContainerId()));
+
+ // check RMContainers are re-recreated and the container state is correct.
+ rm2.waitForState(nm1, amContainer.getContainerId(),
+ RMContainerState.RUNNING);
+ rm2.waitForState(nm1, runningContainer.getContainerId(),
+ RMContainerState.RUNNING);
+
+ // re register attempt to rm2
+ rm2.waitForState(recoveredApp1.getApplicationId(), RMAppState.ACCEPTED);
+ am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
+ am1.registerAppAttempt();
+ rm2.waitForState(recoveredApp1.getApplicationId(), RMAppState.RUNNING);
+
+ // wait for app life time and application to be in killed state.
+ rm2.waitForState(recoveredApp1.getApplicationId(), RMAppState.KILLED);
+ Assert.assertTrue("Applicaiton killed before life timeout value",
+ (System.currentTimeMillis()
+ - recoveredApp1.getSubmitTime()) > appLifetime);
+ }
+
+ private void stopRM(MockRM rm) {
+ if (rm != null) {
+ rm.stop();
+ }
+ }
+}
--
2.7.4 (Apple Git-66)