diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index d9840ac..277be6f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -507,7 +507,15 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String RM_AM_MAX_ATTEMPTS =
RM_PREFIX + "am.max-attempts";
public static final int DEFAULT_RM_AM_MAX_ATTEMPTS = 2;
-
+
+ /**
+ * Whether deleting old application's attempts
+ * where application's attempt number is lager than RM_AM_MAX_ATTEMPTS
+ */
+ public static final String RM_AM_DELETE_OLD_ATTEMPTS_ENABLED =
+ RM_PREFIX + "am.delete-old-attempts.enabled";
+ public static final boolean DEFAULT_RM_AM_DELETE_OLD_ATTEMPTS_ENABLED = false;
+
/** The keytab for the resource manager.*/
public static final String RM_KEYTAB =
RM_PREFIX + "keytab";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 907f290..1f13509 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -338,6 +338,14 @@
+ Flag to enable delete old app attempt than {yarn.resourcemanager.am.max-attempts} .
+ Enable this Flag will reduce memory pressure of rm, but users can't get
+ old attempts information from RM web UI or restful api. The default value is false.
+ yarn.resourcemanager.am.delete-old-attempts.enabled
+ false
+
+
+
How often to check that containers are still alive.
yarn.resourcemanager.container.liveness-monitor.interval-ms
600000
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index c21d8d4..6ded681 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -140,6 +140,7 @@
private final ApplicationMasterService masterService;
private final StringBuilder diagnostics = new StringBuilder();
private final int maxAppAttempts;
+ private final boolean deleteOldAttemptsEnabled;
private final ReadLock readLock;
private final WriteLock writeLock;
private final Map attempts
@@ -467,6 +468,9 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
this.maxAppAttempts = individualMaxAppAttempts;
}
+ this.deleteOldAttemptsEnabled = conf.getBoolean(YarnConfiguration.RM_AM_DELETE_OLD_ATTEMPTS_ENABLED,
+ YarnConfiguration.DEFAULT_RM_AM_DELETE_OLD_ATTEMPTS_ENABLED);
+
this.attemptFailuresValidityInterval =
submissionContext.getAttemptFailuresValidityInterval();
if (this.attemptFailuresValidityInterval > 0) {
@@ -980,6 +984,13 @@ private void createNewAttempt(ApplicationAttemptId appAttemptId) {
currentAttempt = attempt;
}
+ private void tryCleanAttempts(ApplicationAttemptId id) {
+ if (deleteOldAttemptsEnabled) {
+ LOG.info("Remove attempt from RMApp : " + id);
+ attempts.remove(id);
+ }
+ }
+
private void
createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) {
createNewAttempt();
@@ -1607,6 +1618,7 @@ private void removeExcessAttempts(RMAppImpl app) {
long endTime = app.systemClock.getTime();
if (rmAppAttempt.getFinishTime() < (endTime
- app.attemptFailuresValidityInterval)) {
+ app.tryCleanAttempts(attemptId);
app.firstAttemptIdInStateStore++;
LOG.info("Remove attempt from state store : " + attemptId);
app.rmContext.getStateStore().removeApplicationAttempt(attemptId);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
index 4f2fb1a..97c688b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
@@ -851,6 +851,146 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception {
rm2.stop();
}
+
+ @Test (timeout = 120000)
+ public void testRMAppAttemptFailuresValidityIntervalWhileDeleteOldAttempts() throws Exception {
+ getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+ getConf().setBoolean(
+ YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
+
+ getConf().set(
+ YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+ // explicitly set max-am-retry count as 2.
+ getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+ getConf().setBoolean(YarnConfiguration.RM_AM_DELETE_OLD_ATTEMPTS_ENABLED, true);
+
+ MockRM rm1 = new MockRM(getConf());
+ rm1.start();
+
+ MockMemoryRMStateStore memStore =
+ (MockMemoryRMStateStore) rm1.getRMStateStore();
+
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ ControlledClock clock = new ControlledClock();
+ // set window size to 10s
+ RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 10000, false);
+ app1.setSystemClock(clock);
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ // Fail attempt1 normally
+ nm1.nodeHeartbeat(am1.getApplicationAttemptId(),
+ 1, ContainerState.COMPLETE);
+ rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
+ //Wait to make sure attempt1 be removed in State Store
+ //TODO explore a better way than sleeping for a while (YARN-4929)
+ Thread.sleep(15 * 1000);
+
+ // launch the second attempt
+ rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+ Assert.assertEquals(2, app1.getAppAttempts().size());
+
+ RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
+ MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
+
+ // wait for 10 seconds
+ clock.setTime(System.currentTimeMillis() + 10*1000);
+ // Fail attempt2 normally
+ nm1.nodeHeartbeat(am2.getApplicationAttemptId(),
+ 1, ContainerState.COMPLETE);
+ rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
+
+ // can launch the third attempt successfully
+ rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+
+ // Lauch Attempt 3
+
+ MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ rm1.waitForState(am3.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
+
+ Thread.sleep(15 * 1000);
+ nm1.nodeHeartbeat(am3.getApplicationAttemptId(),
+ 1, ContainerState.COMPLETE);
+ rm1.waitForState(am3.getApplicationAttemptId(), RMAppAttemptState.FAILED);
+
+ ApplicationStateData appState =
+ ((MockMemoryRMStateStore) rm1.getRMStateStore()).getState().getApplicationState().
+ get(app1.getApplicationId());
+ Assert.assertEquals(2, appState.getAttemptCount());
+ Assert.assertFalse(app1.getAppAttempts().containsKey(am1.getApplicationAttemptId()));
+
+
+
+ RMAppAttempt attempt4 = app1.getCurrentAppAttempt();
+ MockAM am4 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ rm1.waitForState(am4.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
+
+ // Restart rm.
+ @SuppressWarnings("resource")
+ MockRM rm2 = new MockRM(getConf(), memStore);
+ rm2.start();
+ app1 = (RMAppImpl) rm2.getRMContext().getRMApps().get(app1.getApplicationId());
+ MockMemoryRMStateStore memStore1 =
+ (MockMemoryRMStateStore) rm2.getRMStateStore();
+ ApplicationStateData app1State =
+ memStore1.getState().getApplicationState().
+ get(app1.getApplicationId());
+ Assert.assertEquals(2, app1State.getFirstAttemptId());
+ Assert.assertFalse(app1.getAppAttempts().containsKey(am1.getApplicationAttemptId()));
+
+ // re-register the NM
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ NMContainerStatus status = Records.newRecord(NMContainerStatus.class);
+ status
+ .setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER);
+ status.setContainerId(attempt4.getMasterContainer().getId());
+ status.setContainerState(ContainerState.COMPLETE);
+ status.setDiagnostics("");
+ nm1.registerNode(Collections.singletonList(status), null);
+
+ rm2.waitForState(attempt4.getAppAttemptId(), RMAppAttemptState.FAILED);
+ //Wait to make sure attempt4 be removed in State Store
+ //TODO explore a better way than sleeping for a while (YARN-4929)
+ Thread.sleep(15 * 1000);
+ Assert.assertEquals(2, app1State.getAttemptCount());
+ Assert.assertFalse(app1.getAppAttempts().containsKey(am2.getApplicationAttemptId()));
+
+ rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+
+ // Lauch Attempt 5
+ MockAM am5 = MockRM.launchAndRegisterAM(app1, rm2, nm1);
+ rm2.waitForState(am5.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
+
+ // wait for 10 seconds
+ clock.setTime(System.currentTimeMillis() + 10*1000);
+ // Fail attempt4 normally
+ nm1.nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ rm2.waitForState(am5.getApplicationAttemptId(), RMAppAttemptState.FAILED);
+ Assert.assertEquals(2, app1State.getAttemptCount());
+ Assert.assertFalse(app1.getAppAttempts().containsKey(am3.getApplicationAttemptId()));
+
+ // can launch the 5th attempt successfully
+ rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+
+ MockAM am6 = MockRM.launchAndRegisterAM(app1, rm2, nm1);
+
+ clock.reset();
+ rm2.waitForState(am6.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
+
+ // Fail attempt6 normally
+ nm1
+ .nodeHeartbeat(am6.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ rm2.waitForState(am6.getApplicationAttemptId(), RMAppAttemptState.FAILED);
+ Assert.assertEquals(2, app1State.getAttemptCount());
+ Assert.assertFalse(app1.getAppAttempts().containsKey(am4.getApplicationAttemptId()));
+
+ rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED);
+ rm1.stop();
+ rm2.stop();
+ }
+
private boolean isContainerIdInContainerStatus(
List containerStatuses, ContainerId containerId) {
for (ContainerStatus status : containerStatuses) {