diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index d9840ac..277be6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -507,7 +507,15 @@ public static boolean isAclEnabled(Configuration conf) { public static final String RM_AM_MAX_ATTEMPTS = RM_PREFIX + "am.max-attempts"; public static final int DEFAULT_RM_AM_MAX_ATTEMPTS = 2; - + + /** + * Whether deleting old application's attempts + * where application's attempt number is lager than RM_AM_MAX_ATTEMPTS + */ + public static final String RM_AM_DELETE_OLD_ATTEMPTS_ENABLED = + RM_PREFIX + "am.delete-old-attempts.enabled"; + public static final boolean DEFAULT_RM_AM_DELETE_OLD_ATTEMPTS_ENABLED = false; + /** The keytab for the resource manager.*/ public static final String RM_KEYTAB = RM_PREFIX + "keytab"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 907f290..1f13509 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -338,6 +338,14 @@ + Flag to enable delete old app attempt than {yarn.resourcemanager.am.max-attempts} . + Enable this Flag will reduce memory pressure of rm, but users can't get + old attempts information from RM web UI or restful api. The default value is false. + yarn.resourcemanager.am.delete-old-attempts.enabled + false + + + How often to check that containers are still alive. yarn.resourcemanager.container.liveness-monitor.interval-ms 600000 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index c21d8d4..6ded681 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -140,6 +140,7 @@ private final ApplicationMasterService masterService; private final StringBuilder diagnostics = new StringBuilder(); private final int maxAppAttempts; + private final boolean deleteOldAttemptsEnabled; private final ReadLock readLock; private final WriteLock writeLock; private final Map attempts @@ -467,6 +468,9 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, this.maxAppAttempts = individualMaxAppAttempts; } + this.deleteOldAttemptsEnabled = conf.getBoolean(YarnConfiguration.RM_AM_DELETE_OLD_ATTEMPTS_ENABLED, + YarnConfiguration.DEFAULT_RM_AM_DELETE_OLD_ATTEMPTS_ENABLED); + this.attemptFailuresValidityInterval = submissionContext.getAttemptFailuresValidityInterval(); if (this.attemptFailuresValidityInterval > 0) { @@ -980,6 +984,13 @@ private void createNewAttempt(ApplicationAttemptId appAttemptId) { currentAttempt = attempt; } + private void tryCleanAttempts(ApplicationAttemptId id) { + if (deleteOldAttemptsEnabled) { + LOG.info("Remove attempt from RMApp : " + id); + attempts.remove(id); + } + } + private void createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) { createNewAttempt(); @@ -1607,6 +1618,7 @@ private void removeExcessAttempts(RMAppImpl app) { long endTime = app.systemClock.getTime(); if (rmAppAttempt.getFinishTime() < (endTime - app.attemptFailuresValidityInterval)) { + app.tryCleanAttempts(attemptId); app.firstAttemptIdInStateStore++; LOG.info("Remove attempt from state store : " + attemptId); app.rmContext.getStateStore().removeApplicationAttempt(attemptId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 4f2fb1a..97c688b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -851,6 +851,146 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { rm2.stop(); } + + @Test (timeout = 120000) + public void testRMAppAttemptFailuresValidityIntervalWhileDeleteOldAttempts() throws Exception { + getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + getConf().setBoolean( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false); + + getConf().set( + YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + // explicitly set max-am-retry count as 2. + getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + getConf().setBoolean(YarnConfiguration.RM_AM_DELETE_OLD_ATTEMPTS_ENABLED, true); + + MockRM rm1 = new MockRM(getConf()); + rm1.start(); + + MockMemoryRMStateStore memStore = + (MockMemoryRMStateStore) rm1.getRMStateStore(); + + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + + ControlledClock clock = new ControlledClock(); + // set window size to 10s + RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 10000, false); + app1.setSystemClock(clock); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + // Fail attempt1 normally + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), + 1, ContainerState.COMPLETE); + rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); + //Wait to make sure attempt1 be removed in State Store + //TODO explore a better way than sleeping for a while (YARN-4929) + Thread.sleep(15 * 1000); + + // launch the second attempt + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(2, app1.getAppAttempts().size()); + + RMAppAttempt attempt2 = app1.getCurrentAppAttempt(); + MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.RUNNING); + + // wait for 10 seconds + clock.setTime(System.currentTimeMillis() + 10*1000); + // Fail attempt2 normally + nm1.nodeHeartbeat(am2.getApplicationAttemptId(), + 1, ContainerState.COMPLETE); + rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED); + + // can launch the third attempt successfully + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + + // Lauch Attempt 3 + + MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + rm1.waitForState(am3.getApplicationAttemptId(), RMAppAttemptState.RUNNING); + + Thread.sleep(15 * 1000); + nm1.nodeHeartbeat(am3.getApplicationAttemptId(), + 1, ContainerState.COMPLETE); + rm1.waitForState(am3.getApplicationAttemptId(), RMAppAttemptState.FAILED); + + ApplicationStateData appState = + ((MockMemoryRMStateStore) rm1.getRMStateStore()).getState().getApplicationState(). + get(app1.getApplicationId()); + Assert.assertEquals(2, appState.getAttemptCount()); + Assert.assertFalse(app1.getAppAttempts().containsKey(am1.getApplicationAttemptId())); + + + + RMAppAttempt attempt4 = app1.getCurrentAppAttempt(); + MockAM am4 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + rm1.waitForState(am4.getApplicationAttemptId(), RMAppAttemptState.RUNNING); + + // Restart rm. + @SuppressWarnings("resource") + MockRM rm2 = new MockRM(getConf(), memStore); + rm2.start(); + app1 = (RMAppImpl) rm2.getRMContext().getRMApps().get(app1.getApplicationId()); + MockMemoryRMStateStore memStore1 = + (MockMemoryRMStateStore) rm2.getRMStateStore(); + ApplicationStateData app1State = + memStore1.getState().getApplicationState(). + get(app1.getApplicationId()); + Assert.assertEquals(2, app1State.getFirstAttemptId()); + Assert.assertFalse(app1.getAppAttempts().containsKey(am1.getApplicationAttemptId())); + + // re-register the NM + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + NMContainerStatus status = Records.newRecord(NMContainerStatus.class); + status + .setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER); + status.setContainerId(attempt4.getMasterContainer().getId()); + status.setContainerState(ContainerState.COMPLETE); + status.setDiagnostics(""); + nm1.registerNode(Collections.singletonList(status), null); + + rm2.waitForState(attempt4.getAppAttemptId(), RMAppAttemptState.FAILED); + //Wait to make sure attempt4 be removed in State Store + //TODO explore a better way than sleeping for a while (YARN-4929) + Thread.sleep(15 * 1000); + Assert.assertEquals(2, app1State.getAttemptCount()); + Assert.assertFalse(app1.getAppAttempts().containsKey(am2.getApplicationAttemptId())); + + rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + + // Lauch Attempt 5 + MockAM am5 = MockRM.launchAndRegisterAM(app1, rm2, nm1); + rm2.waitForState(am5.getApplicationAttemptId(), RMAppAttemptState.RUNNING); + + // wait for 10 seconds + clock.setTime(System.currentTimeMillis() + 10*1000); + // Fail attempt4 normally + nm1.nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + rm2.waitForState(am5.getApplicationAttemptId(), RMAppAttemptState.FAILED); + Assert.assertEquals(2, app1State.getAttemptCount()); + Assert.assertFalse(app1.getAppAttempts().containsKey(am3.getApplicationAttemptId())); + + // can launch the 5th attempt successfully + rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + + MockAM am6 = MockRM.launchAndRegisterAM(app1, rm2, nm1); + + clock.reset(); + rm2.waitForState(am6.getApplicationAttemptId(), RMAppAttemptState.RUNNING); + + // Fail attempt6 normally + nm1 + .nodeHeartbeat(am6.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + rm2.waitForState(am6.getApplicationAttemptId(), RMAppAttemptState.FAILED); + Assert.assertEquals(2, app1State.getAttemptCount()); + Assert.assertFalse(app1.getAppAttempts().containsKey(am4.getApplicationAttemptId())); + + rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED); + rm1.stop(); + rm2.stop(); + } + private boolean isContainerIdInContainerStatus( List containerStatuses, ContainerId containerId) { for (ContainerStatus status : containerStatuses) {