diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index d57f22ccdb5..4474a10751e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -903,7 +903,6 @@ public int getMaxAppAttempts() { @Override public void handle(RMAppEvent event) { - this.writeLock.lock(); try { @@ -1459,8 +1458,7 @@ public void transition(RMAppImpl app, RMAppEvent event) { } } - private static final class AppRejectedTransition extends - FinalTransition{ + private static final class AppRejectedTransition extends FinalTransition { public AppRejectedTransition() { super(RMAppState.FAILED); } @@ -1502,39 +1500,50 @@ static void appAdminClientCleanUp(RMAppImpl app) { private final RMAppState finalState; - public FinalTransition(RMAppState finalState) { + FinalTransition(RMAppState finalState) { this.finalState = finalState; } + @Override public void transition(RMAppImpl app, RMAppEvent event) { app.logAggregationStartTime = app.systemClock.getTime(); + completeAndCleanupApp(app); + handleAppFinished(app); + app.clearUnusedFields(); + appAdminClientCleanUp(app); + } + + private void completeAndCleanupApp(RMAppImpl app) { + //cleanup app in RM Nodes for (NodeId nodeId : app.getRanNodes()) { app.handler.handle( - new RMNodeCleanAppEvent(nodeId, app.applicationId)); - } - app.finishTime = app.storedFinishTime; - if (app.finishTime == 0 ) { - app.finishTime = app.systemClock.getTime(); + new RMNodeCleanAppEvent(nodeId, app.applicationId)); } // Recovered apps that are completed were not added to scheduler, so no // need to remove them from scheduler. if (app.recoveredFinalState == null) { app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId, - finalState)); + finalState)); } - app.handler.handle( - new RMAppManagerEvent(app.applicationId, - RMAppManagerEventType.APP_COMPLETED)); + // Send app completed event to AppManager + app.handler.handle(new RMAppManagerEvent(app.applicationId, + RMAppManagerEventType.APP_COMPLETED)); + } + + private void handleAppFinished(RMAppImpl app) { + // record finish time + app.finishTime = app.storedFinishTime; + if (app.finishTime == 0) { + app.finishTime = app.systemClock.getTime(); + } + + //record finish in history and metrics app.rmContext.getRMApplicationHistoryWriter() .applicationFinished(app, finalState); app.rmContext.getSystemMetricsPublisher() .appFinished(app, finalState, app.finishTime); - // set the memory free - app.clearUnusedFields(); - - appAdminClientCleanUp(app); - }; + } } public int getNumFailedAppAttempts() { @@ -1549,7 +1558,7 @@ public int getNumFailedAppAttempts() { return completedAttempts; } - private static final class AttemptFailedTransition implements + private static final class AttemptFailedTransition implements MultipleArcTransition { private final RMAppState initialState; @@ -1812,8 +1821,8 @@ public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { == LogAggregationStatus.TIME_OUT && report.getLogAggregationStatus() == LogAggregationStatus.RUNNING) { - // If the log aggregation status got from latest nm heartbeat - // is Running, and current log aggregation status is TimeOut, + // If the log aggregation status got from latest NM heartbeat + // is RUNNING, and current log aggregation status is TIME_OUT, // based on whether there are any failure messages for this NM, // we will reset the log aggregation status as RUNNING or // RUNNING_WITH_FAILURE @@ -2137,4 +2146,10 @@ protected void onInvalidStateTransition(RMAppEventType rmAppEventType, RMAppState state){ /* TODO fail the application on the failed transition */ } + + @VisibleForTesting + public long getLogAggregationStartTime() { + return logAggregationStartTime; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 70887e0f4bf..04d5ade72ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -18,28 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; - +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -54,7 +33,6 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -92,6 +70,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event + .AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; @@ -111,6 +91,31 @@ import org.mockito.ArgumentCaptor; import org.mockito.Matchers; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + @RunWith(value = Parameterized.class) public class TestRMAppTransitions { @@ -128,13 +133,14 @@ private SystemMetricsPublisher publisher; private YarnScheduler scheduler; private TestSchedulerEventDispatcher schedulerDispatcher; + private TestApplicationManagerEventDispatcher appManagerDispatcher; // ignore all the RM application attempt events private static final class TestApplicationAttemptEventDispatcher implements - EventHandler { + EventHandler { private final RMContext rmContext; - public TestApplicationAttemptEventDispatcher(RMContext rmContext) { + TestApplicationAttemptEventDispatcher(RMContext rmContext) { this.rmContext = rmContext; } @@ -158,7 +164,7 @@ public void handle(RMAppAttemptEvent event) { EventHandler { private final RMContext rmContext; - public TestApplicationEventDispatcher(RMContext rmContext) { + TestApplicationEventDispatcher(RMContext rmContext) { this.rmContext = rmContext; } @@ -181,19 +187,37 @@ public void handle(RMAppEvent event) { // ResourceManager.java private static final class TestApplicationManagerEventDispatcher implements EventHandler { + List events = Lists.newArrayList(); @Override public void handle(RMAppManagerEvent event) { + LOG.info("Handling app manager event: " + event); + events.add(event); } } // handle all the scheduler events - same as in ResourceManager.java private static final class TestSchedulerEventDispatcher implements EventHandler { - public SchedulerEvent lastSchedulerEvent; + List events = Lists.newArrayList();; @Override public void handle(SchedulerEvent event) { - lastSchedulerEvent = event; + LOG.info("Handling scheduler event: " + event); + events.add(event); + } + + SchedulerEvent getLastEvent() { + return events.get(events.size() - 1); + } + + SchedulerEvent getLastEventByType(Class type) { + for (int i = events.size() - 1; i >= 0; i--) { + SchedulerEvent event = events.get(i); + if (type.isInstance(event)) { + return event; + } + } + return null; } } @@ -243,7 +267,7 @@ public void setUp() throws Exception { ResourceScheduler resourceScheduler = mock(ResourceScheduler.class); doReturn(null).when(resourceScheduler) - .getAppResourceUsageReport((ApplicationAttemptId)Matchers.any()); + .getAppResourceUsageReport(Matchers.any()); doReturn(resourceScheduler).when(rmContext).getScheduler(); doReturn(mock(RMTimelineCollectorManager.class)).when(rmContext) @@ -254,9 +278,11 @@ public void setUp() throws Exception { rmDispatcher.register(RMAppEventType.class, new TestApplicationEventDispatcher(rmContext)); - + + appManagerDispatcher = new + TestApplicationManagerEventDispatcher(); rmDispatcher.register(RMAppManagerEventType.class, - new TestApplicationManagerEventDispatcher()); + appManagerDispatcher); schedulerDispatcher = new TestSchedulerEventDispatcher(); rmDispatcher.register(SchedulerEventType.class, @@ -307,14 +333,13 @@ private ByteBuffer getTokensConf() throws IOException { localRes.setType(LocalResourceType.FILE); localRes.setTimestamp(scriptFile.lastModified()); String destinationFile = "dest_file"; - Map localResources = - new HashMap(); + Map localResources = new HashMap<>(); localResources.put(destinationFile, localRes); return localResources; } private Map getEnvironment() { - Map userSetEnv = new HashMap(); + Map userSetEnv = new HashMap<>(); userSetEnv.put(Environment.CONTAINER_ID.name(), "user_set_container_id"); userSetEnv.put(Environment.NM_HOST.name(), "user_set_NM_HOST"); userSetEnv.put(Environment.NM_PORT.name(), "user_set_NM_PORT"); @@ -332,12 +357,12 @@ private ContainerRetryContext getContainerRetryContext() { ContainerRetryContext containerRetryContext = ContainerRetryContext .newInstance( ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES, - new HashSet<>(Arrays.asList(Integer.valueOf(111))), 0, 0); + new HashSet<>(Arrays.asList(111)), 0, 0); return containerRetryContext; } private Map getServiceData() { - Map serviceData = new HashMap(); + Map serviceData = new HashMap<>(); String serviceName = "non_exist_auxService"; serviceData.put(serviceName, ByteBuffer.wrap(serviceName.getBytes())); return serviceData; @@ -371,7 +396,7 @@ private LogAggregationContext getLogAggregationContext() { return logAggregationContext; } - protected RMApp createNewTestApp(ApplicationSubmissionContext + private RMApp createNewTestApp(ApplicationSubmissionContext submissionContext) throws IOException { ApplicationId applicationId = MockApps.newAppID(appId++); String user = MockApps.newUserName(); @@ -397,7 +422,7 @@ protected RMApp createNewTestApp(ApplicationSubmissionContext RMApp application = new RMAppImpl(applicationId, rmContext, conf, name, user, queue, submissionContext, scheduler, masterService, System.currentTimeMillis(), "YARN", null, - new ArrayList()); + new ArrayList<>()); testAppStartState(applicationId, user, name, queue, application); this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(), @@ -424,8 +449,8 @@ private static void testAppStartState(ApplicationId applicationId, name, application.getName()); Assert.assertEquals("application finish time is not 0 and should be", 0, application.getFinishTime()); - Assert.assertEquals("application tracking url is not correct", - null, application.getTrackingUrl()); + Assert.assertNull("application tracking url is not correct", + application.getTrackingUrl()); StringBuilder diag = application.getDiagnostics(); Assert.assertEquals("application diagnostics is not correct", 0, diag.length()); @@ -500,7 +525,7 @@ private void sendAttemptUpdateSavedEvent(RMApp application) { rmDispatcher.await(); } - protected RMApp testCreateAppNewSaving( + private RMApp testCreateAppNewSaving( ApplicationSubmissionContext submissionContext) throws IOException { RMApp application = createNewTestApp(submissionContext); // NEW => NEW_SAVING event RMAppEventType.START @@ -515,7 +540,7 @@ protected RMApp testCreateAppNewSaving( return application; } - protected RMApp testCreateAppSubmittedNoRecovery( + private RMApp testCreateAppSubmittedNoRecovery( ApplicationSubmissionContext submissionContext) throws IOException { RMApp application = testCreateAppNewSaving(submissionContext); // NEW_SAVING => SUBMITTED event RMAppEventType.APP_NEW_SAVED @@ -532,7 +557,7 @@ protected RMApp testCreateAppSubmittedNoRecovery( return application; } - protected RMApp testCreateAppSubmittedRecovery( + private RMApp testCreateAppSubmittedRecovery( ApplicationSubmissionContext submissionContext) throws IOException { RMApp application = createNewTestApp(submissionContext); // NEW => SUBMITTED event RMAppEventType.RECOVER @@ -550,7 +575,7 @@ protected RMApp testCreateAppSubmittedRecovery( return application; } - protected RMApp testCreateAppAccepted( + private RMApp testCreateAppAccepted( ApplicationSubmissionContext submissionContext) throws IOException { RMApp application = testCreateAppSubmittedNoRecovery(submissionContext); // SUBMITTED => ACCEPTED event RMAppEventType.APP_ACCEPTED @@ -563,7 +588,7 @@ protected RMApp testCreateAppAccepted( return application; } - protected RMApp testCreateAppRunning( + private RMApp testCreateAppRunning( ApplicationSubmissionContext submissionContext) throws IOException { RMApp application = testCreateAppAccepted(submissionContext); // ACCEPTED => RUNNING event RMAppEventType.ATTEMPT_REGISTERED @@ -577,7 +602,7 @@ protected RMApp testCreateAppRunning( return application; } - protected RMApp testCreateAppFinalSaving( + private RMApp testCreateAppFinalSaving( ApplicationSubmissionContext submissionContext) throws IOException { RMApp application = testCreateAppRunning(submissionContext); RMAppEvent finishingEvent = @@ -589,7 +614,7 @@ protected RMApp testCreateAppFinalSaving( return application; } - protected RMApp testCreateAppFinishing( + private RMApp testCreateAppFinishing( ApplicationSubmissionContext submissionContext) throws IOException { // unmanaged AMs don't use the FINISHING state assert submissionContext == null || !submissionContext.getUnmanagedAM(); @@ -603,11 +628,11 @@ protected RMApp testCreateAppFinishing( return application; } - protected RMApp testCreateAppFinished( + private RMApp testCreateAppFinished( ApplicationSubmissionContext submissionContext, String diagnostics) throws IOException { // unmanaged AMs don't use the FINISHING state - RMApp application = null; + RMApp application; if (submissionContext != null && submissionContext.getUnmanagedAM()) { application = testCreateAppRunning(submissionContext); } else { @@ -1152,7 +1177,7 @@ public void testAppKilledKilled() throws IOException { } @Test (timeout = 30000) - public void testAppStartAfterKilled() throws IOException { + public void testAppStartAfterKilled() { LOG.info("--- START: testAppStartAfterKilled ---"); ApplicationId applicationId = MockApps.newAppID(appId++); @@ -1162,8 +1187,8 @@ public void testAppStartAfterKilled() throws IOException { @Override protected void onInvalidStateTransition(RMAppEventType rmAppEventType, RMAppState state) { - Assert.assertTrue("RMAppImpl: can't handle " + rmAppEventType - + " at state " + state, false); + Assert.fail("RMAppImpl: can't handle " + rmAppEventType + + " at state " + state); } }; @@ -1199,9 +1224,8 @@ public void testAppsRecoveringStates() throws Exception { } } - public void testRecoverApplication(ApplicationStateData appState, - RMState rmState) - throws Exception { + private void testRecoverApplication(ApplicationStateData appState, + RMState rmState) { ApplicationSubmissionContext submissionContext = appState.getApplicationSubmissionContext(); RMAppImpl application = @@ -1232,7 +1256,7 @@ public void testRecoverApplication(ApplicationStateData appState, verifyRMAppFieldsForFinalTransitions(application); } - public void createRMStateForApplications( + private void createRMStateForApplications( Map applicationState, RMAppState rmAppState) throws IOException { RMApp app = createNewTestApp(null); @@ -1248,15 +1272,94 @@ public void testGetAppReport() throws IOException { RMApp app = createNewTestApp(null); assertAppState(RMAppState.NEW, app); ApplicationReport report = app.createAndGetApplicationReport(null, true); - Assert.assertNotNull(report.getApplicationResourceUsageReport()); + assertNotNull(report.getApplicationResourceUsageReport()); Assert.assertEquals(report.getApplicationResourceUsageReport(),RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT); report = app.createAndGetApplicationReport("clientuser", true); - Assert.assertNotNull(report.getApplicationResourceUsageReport()); + assertNotNull(report.getApplicationResourceUsageReport()); Assert.assertTrue("bad proxy url for app", report.getTrackingUrl().endsWith("/proxy/" + app.getApplicationId() + "/")); } + @Test + public void testFinalTransition() throws IOException { + final long testCaseStartTime = System.currentTimeMillis(); + RMApp application = testCreateAppFinishing(null); + verifyAppBeforeFinishEvent(application, testCaseStartTime); + + RMAppEvent event = new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_FINISHED); + application.handle(event); + rmDispatcher.await(); + + verifyAppAfterFinishEvent(application, testCaseStartTime); + } + + private void verifyAppBeforeFinishEvent(RMApp app, long testCaseStartTime) { + assertEquals(0L, ((RMAppImpl) app).getLogAggregationStartTime()); + //RMAppEventType.APP_UPDATE_SAVED sets the finish time + assertTrue(app.getFinishTime() > testCaseStartTime); + + assertTrue("App manager events should not be received!", + appManagerDispatcher.events.isEmpty()); + } + + private void verifyAppAfterFinishEvent(RMApp app, long testCaseStartTime) { + assertTrue( + testCaseStartTime < ((RMAppImpl) app).getLogAggregationStartTime()); + assertTrue(testCaseStartTime < app.getFinishTime()); + verifyAppCompletedEvent(app); + verifyAppRemovedEvent(app); + } + + private void verifyAppCompletedEvent(RMApp app) { + assertEquals(1, appManagerDispatcher.events.size()); + RMAppManagerEvent rmAppManagerEvent = appManagerDispatcher.events.get(0); + assertEquals(RMAppManagerEventType.APP_COMPLETED, + rmAppManagerEvent.getType()); + assertEquals(app.getApplicationId().getId(), + rmAppManagerEvent.getApplicationId().getId()); + } + + private void verifyAppRemovedEvent(RMApp app) { + SchedulerEvent lastEvent = schedulerDispatcher.getLastEvent(); + SchedulerEvent appRemovedEvent = schedulerDispatcher + .getLastEventByType(AppRemovedSchedulerEvent.class); + assertNotNull("There should be an " + + AppRemovedSchedulerEvent.class.getCanonicalName() + + " in the sequence of scheduler events!", appRemovedEvent); + if (!(lastEvent instanceof AppRemovedSchedulerEvent)) { + handleAppAttemptFollowsAppRemoval(lastEvent, appRemovedEvent); + } + + AppRemovedSchedulerEvent event = + (AppRemovedSchedulerEvent) appRemovedEvent; + assertEquals(SchedulerEventType.APP_REMOVED, event.getType()); + assertEquals(app.getApplicationId().getId(), + event.getApplicationID().getId()); + assertEquals(RMAppState.FINISHED, event.getFinalState()); + } + + /** + * The normal sequence of events: + * APP_ADDED, APP_ATTEMPT_ADDED, APP_REMOVED, APP_COMPLETED.
+ * Sometimes, app is removed before any app attempt is added, like this: + * APP_ADDED, APP_REMOVED, APP_ATTEMPT_ADDED. + * @param lastEvent + * @param appRemovedEvent + */ + private void handleAppAttemptFollowsAppRemoval(SchedulerEvent lastEvent, + SchedulerEvent appRemovedEvent) { + assertNotNull("One before the last captured event " + + "should be an instance of " + + AppRemovedSchedulerEvent.class.getCanonicalName(), + appRemovedEvent); + assertTrue("Last captured event " + lastEvent + + " should be an instance of " + + AppAttemptAddedSchedulerEvent.class.getCanonicalName(), + lastEvent instanceof AppAttemptAddedSchedulerEvent); + } + private void verifyApplicationFinished(RMAppState state) { ArgumentCaptor finalState = ArgumentCaptor.forClass(RMAppState.class); @@ -1269,12 +1372,11 @@ private void verifyApplicationFinished(RMAppState state) { } private void verifyAppRemovedSchedulerEvent(RMAppState finalState) { - Assert.assertEquals(SchedulerEventType.APP_REMOVED, - schedulerDispatcher.lastSchedulerEvent.getType()); - if(schedulerDispatcher.lastSchedulerEvent instanceof - AppRemovedSchedulerEvent) { + SchedulerEvent lastEvent = schedulerDispatcher.getLastEvent(); + Assert.assertEquals(SchedulerEventType.APP_REMOVED, lastEvent.getType()); + if (lastEvent instanceof AppRemovedSchedulerEvent) { AppRemovedSchedulerEvent appRemovedEvent = - (AppRemovedSchedulerEvent) schedulerDispatcher.lastSchedulerEvent; + (AppRemovedSchedulerEvent) lastEvent; Assert.assertEquals(finalState, appRemovedEvent.getFinalState()); } }