diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index b6ca684..1c9d08e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -1109,7 +1109,7 @@ private int getNumNonPreemptedAppAttempts() {
int completedAttempts = 0;
// Do not count AM preemption as attempt failure.
for (RMAppAttempt attempt : attempts.values()) {
- if (!attempt.isPreempted()) {
+ if (!attempt.shouldNotCountFailureToAttemptLimit()) {
completedAttempts++;
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
index 42c37a9..e919685 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
@@ -197,8 +197,14 @@
ApplicationAttemptReport createApplicationAttemptReport();
/**
- * Return the flag which indicates whether the attempt is preempted by the
- * scheduler.
- */
- boolean isPreempted();
+ * Return the flag which indicates whether the attempt failure should be
+ * counted to Attempt Limits.
+ *
+ * There failure types should not be counted to Attempt Limits:
+ * - preempted by the scheduler.
+ * - hardware failures, such as NM failing, lost NM and NM disk errors.
+ * - killed by RM because of RM restart or failover.
+ *
+ */
+ boolean shouldNotCountFailureToAttemptLimit();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 1e7693f..af8b954 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -1087,7 +1087,7 @@ public void transition(RMAppAttemptImpl appAttempt,
.getKeepContainersAcrossApplicationAttempts()
&& !appAttempt.submissionContext.getUnmanagedAM()) {
// See if we should retain containers for non-unmanaged applications
- if (appAttempt.isPreempted()) {
+ if (appAttempt.shouldNotCountFailureToAttemptLimit()) {
// Premption doesn't count towards app-failures and so we should
// retain containers.
keepContainersAcrossAppAttempts = true;
@@ -1136,8 +1136,17 @@ public void transition(RMAppAttemptImpl appAttempt,
}
@Override
- public boolean isPreempted() {
- return getAMContainerExitStatus() == ContainerExitStatus.PREEMPTED;
+ public boolean shouldNotCountFailureToAttemptLimit() {
+ try {
+ this.readLock.lock();
+ int exitStatus = getAMContainerExitStatus();
+ return exitStatus == ContainerExitStatus.PREEMPTED
+ || exitStatus == ContainerExitStatus.ABORTED
+ || exitStatus == ContainerExitStatus.DISKS_FAILED
+ || exitStatus == ContainerExitStatus.KILLED_BY_RESOURCEMANAGER;
+ } finally {
+ this.readLock.unlock();
+ }
}
private static final class UnmanagedAMAttemptSavedTransition
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
index ec4ed0e..164a2f2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -34,6 +35,7 @@
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@@ -44,10 +46,12 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
@@ -372,7 +376,7 @@ public void testAMPreemptedNotCountedForAMFailures() throws Exception {
scheduler.killContainer(scheduler.getRMContainer(amContainer));
am1.waitForState(RMAppAttemptState.FAILED);
- Assert.assertTrue(attempt1.isPreempted());
+ Assert.assertTrue(attempt1.shouldNotCountFailureToAttemptLimit());
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
// AM should be restarted even though max-am-attempt is 1.
MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
@@ -385,7 +389,7 @@ public void testAMPreemptedNotCountedForAMFailures() throws Exception {
scheduler.killContainer(scheduler.getRMContainer(amContainer2));
am2.waitForState(RMAppAttemptState.FAILED);
- Assert.assertTrue(attempt2.isPreempted());
+ Assert.assertTrue(attempt2.shouldNotCountFailureToAttemptLimit());
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
RMAppAttempt attempt3 = app1.getCurrentAppAttempt();
@@ -394,7 +398,7 @@ public void testAMPreemptedNotCountedForAMFailures() throws Exception {
// fail the AM normally
nm1.nodeHeartbeat(am3.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am3.waitForState(RMAppAttemptState.FAILED);
- Assert.assertFalse(attempt3.isPreempted());
+ Assert.assertFalse(attempt3.shouldNotCountFailureToAttemptLimit());
// AM should not be restarted.
rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
@@ -434,7 +438,7 @@ public void testPreemptedAMRestartOnRMRestart() throws Exception {
scheduler.killContainer(scheduler.getRMContainer(amContainer));
am1.waitForState(RMAppAttemptState.FAILED);
- Assert.assertTrue(attempt1.isPreempted());
+ Assert.assertTrue(attempt1.shouldNotCountFailureToAttemptLimit());
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
// state store has 1 attempt stored.
@@ -458,11 +462,166 @@ public void testPreemptedAMRestartOnRMRestart() throws Exception {
RMAppAttempt attempt2 =
rm2.getRMContext().getRMApps().get(app1.getApplicationId())
.getCurrentAppAttempt();
- Assert.assertFalse(attempt2.isPreempted());
+ Assert.assertFalse(attempt2.shouldNotCountFailureToAttemptLimit());
Assert.assertEquals(ContainerExitStatus.INVALID,
appState.getAttempt(am2.getApplicationAttemptId())
.getAMContainerExitStatus());
rm1.stop();
rm2.stop();
}
+
+ // AM container preempted, nm disk failure
+ // should not be counted towards AM max retry count.
+ @Test(timeout = 50000)
+ public void testShouldNotCountFailureToAttemptLimit() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ // explicitly set max-am-retry count as 1.
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+ MockRM rm1 = new MockRM(conf);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ RMApp app1 = rm1.submitApp(200);
+ RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ CapacityScheduler scheduler =
+ (CapacityScheduler) rm1.getResourceScheduler();
+ ContainerId amContainer =
+ ContainerId.newInstance(am1.getApplicationAttemptId(), 1);
+ // Preempt the first attempt;
+ scheduler.killContainer(scheduler.getRMContainer(amContainer));
+
+ am1.waitForState(RMAppAttemptState.FAILED);
+ Assert.assertTrue(attempt1.shouldNotCountFailureToAttemptLimit());
+ rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+ // AM should be restarted even though max-am-attempt is 1.
+ MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
+ Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
+
+ // Preempt the second attempt.
+ ContainerId amContainer2 =
+ ContainerId.newInstance(am2.getApplicationAttemptId(), 1);
+ scheduler.killContainer(scheduler.getRMContainer(amContainer2));
+
+ am2.waitForState(RMAppAttemptState.FAILED);
+ Assert.assertTrue(attempt2.shouldNotCountFailureToAttemptLimit());
+ rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+ MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ RMAppAttempt attempt3 = app1.getCurrentAppAttempt();
+ Assert.assertTrue(((RMAppAttemptImpl) attempt3).mayBeLastAttempt());
+
+ // mimic NM disk_failure
+ ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class);
+ containerStatus.setContainerId(attempt3.getMasterContainer().getId());
+ containerStatus.setDiagnostics("mimic NM disk_failure");
+ containerStatus.setState(ContainerState.COMPLETE);
+ containerStatus.setExitStatus(ContainerExitStatus.DISKS_FAILED);
+ RMAppAttemptContainerFinishedEvent event =
+ new RMAppAttemptContainerFinishedEvent(attempt3.getAppAttemptId(), containerStatus);
+ attempt3.handle(event);
+ am3.waitForState(RMAppAttemptState.FAILED);
+ Assert.assertTrue(attempt3.shouldNotCountFailureToAttemptLimit());
+
+ rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+ MockAM am4 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ RMAppAttempt attempt4 = app1.getCurrentAppAttempt();
+ Assert.assertTrue(((RMAppAttemptImpl) attempt4).mayBeLastAttempt());
+
+ // fail the AM normally
+ nm1.nodeHeartbeat(am4.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ am4.waitForState(RMAppAttemptState.FAILED);
+ Assert.assertFalse(attempt4.shouldNotCountFailureToAttemptLimit());
+
+ // AM should not be restarted.
+ rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
+ Assert.assertEquals(4, app1.getAppAttempts().size());
+ rm1.stop();
+ }
+
+ // Test RM restarts after AM container is preempted,
+ // or regular RM restart/failover, new RM should not count
+ // AM failure towards the max-retry-account and should be able to
+ // re-launch the AM.
+ @Test(timeout = 50000)
+ public void testshouldNotCountFailureToAttemptLimitOnRMRestart()
+ throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+ conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+ // explicitly set max-am-retry count as 1.
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ RMApp app1 = rm1.submitApp(200);
+ RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ CapacityScheduler scheduler =
+ (CapacityScheduler) rm1.getResourceScheduler();
+ ContainerId amContainer =
+ ContainerId.newInstance(am1.getApplicationAttemptId(), 1);
+
+ // Forcibly preempt the am container;
+ scheduler.killContainer(scheduler.getRMContainer(amContainer));
+
+ am1.waitForState(RMAppAttemptState.FAILED);
+ Assert.assertTrue(attempt1.shouldNotCountFailureToAttemptLimit());
+ rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+
+ // state store has 1 attempt stored.
+ ApplicationState appState =
+ memStore.getState().getApplicationState().get(app1.getApplicationId());
+ Assert.assertEquals(1, appState.getAttemptCount());
+ // attempt stored has the preempted container exit status.
+ Assert.assertEquals(ContainerExitStatus.PREEMPTED,
+ appState.getAttempt(am1.getApplicationAttemptId())
+ .getAMContainerExitStatus());
+
+ // AM should be restarted even though max-am-attempt is 1.
+ MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
+ Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
+
+ // Restart rm.
+ MockRM rm2 = new MockRM(conf, memStore);
+ rm2.start();
+
+ // re-register the NM
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ NMContainerStatus status = Records.newRecord(NMContainerStatus.class);
+ status
+ .setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER);
+ status.setContainerId(attempt2.getMasterContainer().getId());
+ status.setContainerState(ContainerState.COMPLETE);
+ status.setDiagnostics("");
+ nm1.registerNode(Collections.singletonList(status), null);
+
+ rm2.waitForState(attempt2.getAppAttemptId(), RMAppAttemptState.FAILED);
+ // Will automatically start the third AppAttempt
+ rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+ MockAM am3 =
+ rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1);
+ MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am3);
+ RMAppAttempt attempt3 =
+ rm2.getRMContext().getRMApps().get(app1.getApplicationId())
+ .getCurrentAppAttempt();
+ Assert.assertFalse(attempt3.shouldNotCountFailureToAttemptLimit());
+ Assert.assertEquals(ContainerExitStatus.INVALID,
+ appState.getAttempt(am3.getApplicationAttemptId())
+ .getAMContainerExitStatus());
+
+ rm1.stop();
+ rm2.stop();
+ }
}