diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index a8a0af4..e943f02 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -766,22 +766,8 @@ public AttemptFailedTransition(RMAppState initialState) { public RMAppState transition(RMAppImpl app, RMAppEvent event) { RMAppFailedAttemptEvent failedEvent = ((RMAppFailedAttemptEvent) event); - boolean retryApp = true; - String msg = null; - if (app.submissionContext.getUnmanagedAM()) { - // RM does not manage the AM. Do not retry - retryApp = false; - msg = "Unmanaged application " + app.getApplicationId() - + " failed due to " + failedEvent.getDiagnostics() - + ". Failing the application."; - } else if (app.attempts.size() >= app.maxAppAttempts) { - retryApp = false; - msg = "Application " + app.getApplicationId() + " failed " - + app.maxAppAttempts + " times due to " + failedEvent.getDiagnostics() - + ". Failing the application."; - } - - if (retryApp) { + StringBuilder msg = new StringBuilder(); + if (app.shouldRetry(msg, failedEvent.getDiagnostics())) { app.createNewAttempt(true); return initialState; } else { @@ -795,6 +781,27 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { } + public boolean shouldRetry(StringBuilder msg, String diagnostics) { + boolean retryApp = true; + if (submissionContext.getUnmanagedAM()) { + // RM does not manage the AM. Do not retry + retryApp = false; + if (msg != null) { + msg.append("Unmanaged application " + applicationId + + " failed due to " + diagnostics + + ". Failing the application."); + } + } else if (attempts.size() >= maxAppAttempts) { + retryApp = false; + if (msg != null) { + msg.append("Application " + applicationId + " failed " + + maxAppAttempts + " times due to " + diagnostics + + ". Failing the application."); + } + } + return retryApp; + } + @Override public String getApplicationType() { return this.applicationType; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index b5b22b6..5d38605 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.util.resource.Resources; @@ -56,6 +57,7 @@ Queue queue; final String user; private final AtomicInteger containerIdCounter = new AtomicInteger(0); + private final RMContext rmContext; final Set priorities = new TreeSet( new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator()); @@ -70,13 +72,15 @@ boolean pending = true; // for app metrics public AppSchedulingInfo(ApplicationAttemptId appAttemptId, - String user, Queue queue, ActiveUsersManager activeUsersManager) { + String user, Queue queue, ActiveUsersManager activeUsersManager, + RMContext rmContext) { this.applicationAttemptId = appAttemptId; this.applicationId = appAttemptId.getApplicationId(); this.queue = queue; this.queueName = queue.getQueueName(); this.user = user; this.activeUsersManager = activeUsersManager; + this.rmContext = rmContext; } public ApplicationId getApplicationId() { @@ -95,6 +99,10 @@ public String getUser() { return user; } + public RMContext getRMContext() { + return rmContext; + } + public synchronized boolean isPending() { return pending; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index 9d2c739..2a1086f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; @@ -57,7 +58,7 @@ @Metric("# of pending apps") MutableGaugeInt appsPending; @Metric("# of apps completed") MutableCounterInt appsCompleted; @Metric("# of apps killed") MutableCounterInt appsKilled; - @Metric("# of apps failed") MutableGaugeInt appsFailed; + @Metric("# of apps failed") MutableCounterInt appsFailed; @Metric("Allocated memory in MB") MutableGaugeInt allocatedMB; @Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores; @@ -217,8 +218,6 @@ public void getMetrics(MetricsCollector collector, boolean all) { public void submitApp(String user, int attemptId) { if (attemptId == 1) { appsSubmitted.incr(); - } else { - appsFailed.decr(); } appsPending.incr(); QueueMetrics userMetrics = getUserMetrics(user); @@ -248,7 +247,16 @@ public void finishApp(AppSchedulingInfo app, runBuckets.remove(app.getApplicationId()); switch (rmAppAttemptFinalState) { case KILLED: appsKilled.incr(); break; - case FAILED: appsFailed.incr(); break; + case FAILED: + { + RMAppImpl rmapp = (RMAppImpl) app.getRMContext().getRMApps() + .get(app.getApplicationId()); + // Increment only when it's the last attempt + if (!rmapp.shouldRetry(null, null)) { + appsFailed.incr(); + } + break; + } default: appsCompleted.incr(); break; } if (app.isPending()) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index b93965c..6b548d0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -123,7 +123,7 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, this.rmContext = rmContext; this.appSchedulingInfo = new AppSchedulingInfo(applicationAttemptId, user, queue, - activeUsersManager); + activeUsersManager, rmContext); this.queue = queue; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java index 8b5d454..36848d7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java @@ -111,7 +111,7 @@ public FSSchedulerApp(ApplicationAttemptId applicationAttemptId, this.rmContext = rmContext; this.appSchedulingInfo = new AppSchedulingInfo(applicationAttemptId, user, queue, - activeUsersManager); + activeUsersManager, rmContext); this.queue = queue; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java index b3702ad..5fea890 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java @@ -24,9 +24,13 @@ import static org.apache.hadoop.test.MockitoMaker.make; import static org.apache.hadoop.test.MockitoMaker.stub; import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsSource; @@ -37,6 +41,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -64,7 +71,7 @@ public void setUp() { QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, false, conf); MetricsSource queueSource= queueSource(ms, queueName); - AppSchedulingInfo app = mockApp(user); + AppSchedulingInfo app = mockApp(user, false); metrics.submitApp(user, 1); MetricsSource userSource = userSource(ms, queueName, user); @@ -89,16 +96,27 @@ public void setUp() { checkApps(queueSource, 1, 0, 0, 1, 0, 0, true); assertNull(userSource); } - + @Test - public void testQueueAppMetricsForMultipleFailures() { + public void testQueueAppMetricsForMultipleFailures1() { + // Last attempt + testQueueAppMetricsForMultipleFailures(false); + } + + @Test + public void testQueueAppMetricsForMultipleFailures2() { + // Not last attempt + testQueueAppMetricsForMultipleFailures(true); + } + + private void testQueueAppMetricsForMultipleFailures(boolean shouldRetry) { String queueName = "single"; String user = "alice"; QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, false, new Configuration()); MetricsSource queueSource = queueSource(ms, queueName); - AppSchedulingInfo app = mockApp(user); + AppSchedulingInfo app = mockApp(user, shouldRetry); metrics.submitApp(user, 1); MetricsSource userSource = userSource(ms, queueName, user); @@ -108,31 +126,31 @@ public void testQueueAppMetricsForMultipleFailures() { checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); metrics.finishApp(app, RMAppAttemptState.FAILED); - checkApps(queueSource, 1, 0, 0, 0, 1, 0, true); + checkApps(queueSource, 1, 0, 0, 0, (shouldRetry ? 0 : 1), 0, true); // As the application has failed, framework retries the same application // based on configuration metrics.submitApp(user, 2); - checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); + checkApps(queueSource, 1, 1, 0, 0, (shouldRetry ? 0 : 1), 0, true); metrics.incrAppsRunning(app, user); - checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); + checkApps(queueSource, 1, 0, 1, 0, (shouldRetry ? 0 : 1), 0, true); // Suppose say application has failed this time as well. metrics.finishApp(app, RMAppAttemptState.FAILED); - checkApps(queueSource, 1, 0, 0, 0, 1, 0, true); + checkApps(queueSource, 1, 0, 0, 0, (shouldRetry ? 0 : 2), 0, true); // As the application has failed, framework retries the same application // based on configuration metrics.submitApp(user, 3); - checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); + checkApps(queueSource, 1, 1, 0, 0, (shouldRetry ? 0 : 2), 0, true); metrics.incrAppsRunning(app, user); - checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); + checkApps(queueSource, 1, 0, 1, 0, (shouldRetry ? 0 : 2), 0, true); // Suppose say application has finished. metrics.finishApp(app, RMAppAttemptState.FINISHED); - checkApps(queueSource, 1, 0, 0, 1, 0, 0, true); + checkApps(queueSource, 1, 0, 0, 1, (shouldRetry ? 0 : 2), 0, true); assertNull(userSource); } @@ -144,7 +162,7 @@ public void testQueueAppMetricsForMultipleFailures() { QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, true, conf); MetricsSource queueSource = queueSource(ms, queueName); - AppSchedulingInfo app = mockApp(user); + AppSchedulingInfo app = mockApp(user, false); metrics.submitApp(user, 1); MetricsSource userSource = userSource(ms, queueName, user); @@ -190,7 +208,7 @@ public void testQueueAppMetricsForMultipleFailures() { QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true, conf); MetricsSource parentQueueSource = queueSource(ms, parentQueueName); MetricsSource queueSource = queueSource(ms, leafQueueName); - AppSchedulingInfo app = mockApp(user); + AppSchedulingInfo app = mockApp(user, false); metrics.submitApp(user, 1); MetricsSource userSource = userSource(ms, leafQueueName, user); @@ -308,7 +326,7 @@ public static void checkApps(MetricsSource source, int submitted, int pending, assertGauge("AppsPending", pending, rb); assertGauge("AppsRunning", running, rb); assertCounter("AppsCompleted", completed, rb); - assertGauge("AppsFailed", failed, rb); + assertCounter("AppsFailed", failed, rb); assertCounter("AppsKilled", killed, rb); } @@ -333,12 +351,22 @@ public static void checkResources(MetricsSource source, int allocatedMB, assertGauge("ReservedContainers", reservedCtnrs, rb); } - private static AppSchedulingInfo mockApp(String user) { + private static AppSchedulingInfo mockApp(String user, boolean shouldRetry) { AppSchedulingInfo app = mock(AppSchedulingInfo.class); when(app.getUser()).thenReturn(user); ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + when(app.getApplicationId()).thenReturn(appId); ApplicationAttemptId id = BuilderUtils.newApplicationAttemptId(appId, 1); when(app.getApplicationAttemptId()).thenReturn(id); + RMContext rmContext = mock(RMContext.class); + when(app.getRMContext()).thenReturn(rmContext); + RMAppImpl rmapp = mock(RMAppImpl.class); + ConcurrentMap map = + new ConcurrentHashMap(); + map.put(appId, rmapp); + when(rmContext.getRMApps()).thenReturn(map); + when(rmapp.shouldRetry( + any(StringBuilder.class), any(String.class))).thenReturn(shouldRetry); return app; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 4f4bf2f..61b2fb4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -38,11 +38,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -55,6 +57,8 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -94,11 +98,18 @@ private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); + @SuppressWarnings("unchecked") @Before public void setUp() throws Exception { CapacityScheduler spyCs = new CapacityScheduler(); cs = spy(spyCs); - rmContext = TestUtils.getMockRMContext(); + rmContext = spy(TestUtils.getMockRMContext()); + ConcurrentMap map = mock(ConcurrentMap.class); + RMAppImpl rmapp = mock(RMAppImpl.class); + when(rmapp.shouldRetry(any(StringBuilder.class), + any(String.class))).thenReturn(false); + when(map.get(any(ApplicationId.class))).thenReturn(rmapp); + when(rmContext.getRMApps()).thenReturn(map); csConf = new CapacitySchedulerConfiguration(); @@ -372,7 +383,7 @@ public void testAppAttemptMetrics() throws Exception { assertEquals(1, a.getMetrics().getAppsSubmitted()); assertEquals(0, a.getMetrics().getAppsPending()); - assertEquals(0, a.getMetrics().getAppsFailed()); + assertEquals(1, a.getMetrics().getAppsFailed()); assertEquals(1, a.getMetrics().getAppsCompleted()); QueueMetrics userMetrics = a.getMetrics().getUserMetrics(user_0);