diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index d57f22ccdb5..bd645d7c733 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -903,7 +903,6 @@ public int getMaxAppAttempts() { @Override public void handle(RMAppEvent event) { - this.writeLock.lock(); try { @@ -1459,8 +1458,7 @@ public void transition(RMAppImpl app, RMAppEvent event) { } } - private static final class AppRejectedTransition extends - FinalTransition{ + private static final class AppRejectedTransition extends FinalTransition { public AppRejectedTransition() { super(RMAppState.FAILED); } @@ -1502,39 +1500,50 @@ static void appAdminClientCleanUp(RMAppImpl app) { private final RMAppState finalState; - public FinalTransition(RMAppState finalState) { + FinalTransition(RMAppState finalState) { this.finalState = finalState; } + @Override public void transition(RMAppImpl app, RMAppEvent event) { - app.logAggregationStartTime = app.systemClock.getTime(); + completeAndCleanupApp(app); + handleAppFinished(app); + app.clearUnusedFields(); + appAdminClientCleanUp(app); + } + + private void completeAndCleanupApp(RMAppImpl app) { + //cleanup app in RM Nodes for (NodeId nodeId : app.getRanNodes()) { app.handler.handle( - new RMNodeCleanAppEvent(nodeId, app.applicationId)); - } - app.finishTime = app.storedFinishTime; - if (app.finishTime == 0 ) { - app.finishTime = app.systemClock.getTime(); + new RMNodeCleanAppEvent(nodeId, app.applicationId)); } // Recovered apps that are completed were not added to scheduler, so no // need to remove them from scheduler. if (app.recoveredFinalState == null) { app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId, - finalState)); + finalState)); + } + + // Send app completed event to AppManager + app.handler.handle(new RMAppManagerEvent(app.applicationId, + RMAppManagerEventType.APP_COMPLETED)); + } + + private void handleAppFinished(RMAppImpl app) { + app.logAggregationStartTime = app.systemClock.getTime(); + // record finish time + app.finishTime = app.storedFinishTime; + if (app.finishTime == 0) { + app.finishTime = app.systemClock.getTime(); } - app.handler.handle( - new RMAppManagerEvent(app.applicationId, - RMAppManagerEventType.APP_COMPLETED)); + //record finish in history and metrics app.rmContext.getRMApplicationHistoryWriter() .applicationFinished(app, finalState); app.rmContext.getSystemMetricsPublisher() .appFinished(app, finalState, app.finishTime); - // set the memory free - app.clearUnusedFields(); - - appAdminClientCleanUp(app); - }; + } } public int getNumFailedAppAttempts() { @@ -1550,7 +1559,7 @@ public int getNumFailedAppAttempts() { } private static final class AttemptFailedTransition implements - MultipleArcTransition { + MultipleArcTransition { private final RMAppState initialState; @@ -1812,8 +1821,8 @@ public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { == LogAggregationStatus.TIME_OUT && report.getLogAggregationStatus() == LogAggregationStatus.RUNNING) { - // If the log aggregation status got from latest nm heartbeat - // is Running, and current log aggregation status is TimeOut, + // If the log aggregation status got from latest NM heartbeat + // is RUNNING, and current log aggregation status is TIME_OUT, // based on whether there are any failure messages for this NM, // we will reset the log aggregation status as RUNNING or // RUNNING_WITH_FAILURE @@ -2137,4 +2146,10 @@ protected void onInvalidStateTransition(RMAppEventType rmAppEventType, RMAppState state){ /* TODO fail the application on the failed transition */ } + + @VisibleForTesting + public long getLogAggregationStartTime() { + return logAggregationStartTime; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 70887e0f4bf..73db66e5f55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -18,28 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; - +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -54,7 +33,6 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -89,7 +67,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; + +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt + .RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; + + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; @@ -111,6 +94,32 @@ import org.mockito.ArgumentCaptor; import org.mockito.Matchers; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + @RunWith(value = Parameterized.class) public class TestRMAppTransitions { @@ -128,13 +137,15 @@ private SystemMetricsPublisher publisher; private YarnScheduler scheduler; private TestSchedulerEventDispatcher schedulerDispatcher; + private TestApplicationManagerEventDispatcher appManagerDispatcher; + private long testCaseStartTime; // ignore all the RM application attempt events private static final class TestApplicationAttemptEventDispatcher implements - EventHandler { + EventHandler { private final RMContext rmContext; - public TestApplicationAttemptEventDispatcher(RMContext rmContext) { + TestApplicationAttemptEventDispatcher(RMContext rmContext) { this.rmContext = rmContext; } @@ -158,7 +169,7 @@ public void handle(RMAppAttemptEvent event) { EventHandler { private final RMContext rmContext; - public TestApplicationEventDispatcher(RMContext rmContext) { + TestApplicationEventDispatcher(RMContext rmContext) { this.rmContext = rmContext; } @@ -181,19 +192,27 @@ public void handle(RMAppEvent event) { // ResourceManager.java private static final class TestApplicationManagerEventDispatcher implements EventHandler { + List events = Lists.newArrayList(); @Override public void handle(RMAppManagerEvent event) { + LOG.info("Handling app manager event: " + event); + events.add(event); } } // handle all the scheduler events - same as in ResourceManager.java private static final class TestSchedulerEventDispatcher implements EventHandler { - public SchedulerEvent lastSchedulerEvent; + List events = Lists.newArrayList();; @Override public void handle(SchedulerEvent event) { - lastSchedulerEvent = event; + LOG.info("Handling scheduler event: " + event); + events.add(event); + } + + SchedulerEvent getLastEvent() { + return events.get(events.size() - 1); } } @@ -243,7 +262,7 @@ public void setUp() throws Exception { ResourceScheduler resourceScheduler = mock(ResourceScheduler.class); doReturn(null).when(resourceScheduler) - .getAppResourceUsageReport((ApplicationAttemptId)Matchers.any()); + .getAppResourceUsageReport(Matchers.any()); doReturn(resourceScheduler).when(rmContext).getScheduler(); doReturn(mock(RMTimelineCollectorManager.class)).when(rmContext) @@ -254,9 +273,11 @@ public void setUp() throws Exception { rmDispatcher.register(RMAppEventType.class, new TestApplicationEventDispatcher(rmContext)); - + + appManagerDispatcher = new + TestApplicationManagerEventDispatcher(); rmDispatcher.register(RMAppManagerEventType.class, - new TestApplicationManagerEventDispatcher()); + appManagerDispatcher); schedulerDispatcher = new TestSchedulerEventDispatcher(); rmDispatcher.register(SchedulerEventType.class, @@ -264,6 +285,7 @@ public void setUp() throws Exception { rmDispatcher.init(conf); rmDispatcher.start(); + testCaseStartTime = System.currentTimeMillis(); } private ByteBuffer getTokens() throws IOException { @@ -307,14 +329,13 @@ private ByteBuffer getTokensConf() throws IOException { localRes.setType(LocalResourceType.FILE); localRes.setTimestamp(scriptFile.lastModified()); String destinationFile = "dest_file"; - Map localResources = - new HashMap(); + Map localResources = new HashMap<>(); localResources.put(destinationFile, localRes); return localResources; } private Map getEnvironment() { - Map userSetEnv = new HashMap(); + Map userSetEnv = new HashMap<>(); userSetEnv.put(Environment.CONTAINER_ID.name(), "user_set_container_id"); userSetEnv.put(Environment.NM_HOST.name(), "user_set_NM_HOST"); userSetEnv.put(Environment.NM_PORT.name(), "user_set_NM_PORT"); @@ -332,12 +353,12 @@ private ContainerRetryContext getContainerRetryContext() { ContainerRetryContext containerRetryContext = ContainerRetryContext .newInstance( ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES, - new HashSet<>(Arrays.asList(Integer.valueOf(111))), 0, 0); + new HashSet<>(Arrays.asList(111)), 0, 0); return containerRetryContext; } private Map getServiceData() { - Map serviceData = new HashMap(); + Map serviceData = new HashMap<>(); String serviceName = "non_exist_auxService"; serviceData.put(serviceName, ByteBuffer.wrap(serviceName.getBytes())); return serviceData; @@ -371,7 +392,7 @@ private LogAggregationContext getLogAggregationContext() { return logAggregationContext; } - protected RMApp createNewTestApp(ApplicationSubmissionContext + private RMApp createNewTestApp(ApplicationSubmissionContext submissionContext) throws IOException { ApplicationId applicationId = MockApps.newAppID(appId++); String user = MockApps.newUserName(); @@ -397,7 +418,7 @@ protected RMApp createNewTestApp(ApplicationSubmissionContext RMApp application = new RMAppImpl(applicationId, rmContext, conf, name, user, queue, submissionContext, scheduler, masterService, System.currentTimeMillis(), "YARN", null, - new ArrayList()); + new ArrayList<>()); testAppStartState(applicationId, user, name, queue, application); this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(), @@ -424,19 +445,21 @@ private static void testAppStartState(ApplicationId applicationId, name, application.getName()); Assert.assertEquals("application finish time is not 0 and should be", 0, application.getFinishTime()); - Assert.assertEquals("application tracking url is not correct", - null, application.getTrackingUrl()); + Assert.assertNull("application tracking url is not correct", + application.getTrackingUrl()); StringBuilder diag = application.getDiagnostics(); Assert.assertEquals("application diagnostics is not correct", 0, diag.length()); } // test to make sure times are set when app finishes - private static void assertStartTimeSet(RMApp application) { + private void assertStartTimeSet(RMApp application) { Assert.assertTrue("application start time is not greater than 0", application.getStartTime() > 0); Assert.assertTrue("application start time is before currentTime", application.getStartTime() <= System.currentTimeMillis()); + Assert.assertTrue("application start time is before test case start time", + application.getStartTime() > testCaseStartTime); } private static void assertAppState(RMAppState state, RMApp application) { @@ -453,7 +476,7 @@ private static void assertFinalAppStatus(FinalApplicationStatus status, RMApp ap private void assertTimesAtFinish(RMApp application) { assertStartTimeSet(application); Assert.assertTrue("application finish time is not greater than 0", - (application.getFinishTime() > 0)); + (application.getFinishTime() > testCaseStartTime)); Assert.assertTrue("application finish time is not >= start time", (application.getFinishTime() >= application.getStartTime())); } @@ -500,7 +523,7 @@ private void sendAttemptUpdateSavedEvent(RMApp application) { rmDispatcher.await(); } - protected RMApp testCreateAppNewSaving( + private RMApp testCreateAppNewSaving( ApplicationSubmissionContext submissionContext) throws IOException { RMApp application = createNewTestApp(submissionContext); // NEW => NEW_SAVING event RMAppEventType.START @@ -515,7 +538,7 @@ protected RMApp testCreateAppNewSaving( return application; } - protected RMApp testCreateAppSubmittedNoRecovery( + private RMApp testCreateAppSubmittedNoRecovery( ApplicationSubmissionContext submissionContext) throws IOException { RMApp application = testCreateAppNewSaving(submissionContext); // NEW_SAVING => SUBMITTED event RMAppEventType.APP_NEW_SAVED @@ -532,13 +555,15 @@ protected RMApp testCreateAppSubmittedNoRecovery( return application; } - protected RMApp testCreateAppSubmittedRecovery( + private RMApp testCreateAppSubmittedRecovery( ApplicationSubmissionContext submissionContext) throws IOException { RMApp application = createNewTestApp(submissionContext); // NEW => SUBMITTED event RMAppEventType.RECOVER RMState state = new RMState(); + long startTime = testCaseStartTime + 1; ApplicationStateData appState = - ApplicationStateData.newInstance(123, 123, null, "user", null); + ApplicationStateData.newInstance(testCaseStartTime, startTime, null, + "user", null); state.getApplicationState().put(application.getApplicationId(), appState); RMAppEvent event = new RMAppRecoverEvent(application.getApplicationId(), state); @@ -550,7 +575,7 @@ protected RMApp testCreateAppSubmittedRecovery( return application; } - protected RMApp testCreateAppAccepted( + private RMApp testCreateAppAccepted( ApplicationSubmissionContext submissionContext) throws IOException { RMApp application = testCreateAppSubmittedNoRecovery(submissionContext); // SUBMITTED => ACCEPTED event RMAppEventType.APP_ACCEPTED @@ -563,7 +588,7 @@ protected RMApp testCreateAppAccepted( return application; } - protected RMApp testCreateAppRunning( + private RMApp testCreateAppRunning( ApplicationSubmissionContext submissionContext) throws IOException { RMApp application = testCreateAppAccepted(submissionContext); // ACCEPTED => RUNNING event RMAppEventType.ATTEMPT_REGISTERED @@ -577,7 +602,7 @@ protected RMApp testCreateAppRunning( return application; } - protected RMApp testCreateAppFinalSaving( + private RMApp testCreateAppFinalSaving( ApplicationSubmissionContext submissionContext) throws IOException { RMApp application = testCreateAppRunning(submissionContext); RMAppEvent finishingEvent = @@ -589,8 +614,9 @@ protected RMApp testCreateAppFinalSaving( return application; } - protected RMApp testCreateAppFinishing( - ApplicationSubmissionContext submissionContext) throws IOException { + private RMApp testCreateAppFinishing( + ApplicationSubmissionContext submissionContext) throws IOException, + InterruptedException { // unmanaged AMs don't use the FINISHING state assert submissionContext == null || !submissionContext.getUnmanagedAM(); RMApp application = testCreateAppFinalSaving(submissionContext); @@ -598,16 +624,18 @@ protected RMApp testCreateAppFinishing( RMAppEvent appUpdated = new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_UPDATE_SAVED); application.handle(appUpdated); + waitForAppAttemptState(application.getApplicationId(), + RMAppAttemptState.SUBMITTED); assertAppState(RMAppState.FINISHING, application); assertTimesAtFinish(application); return application; } - protected RMApp testCreateAppFinished( + private RMApp testCreateAppFinished( ApplicationSubmissionContext submissionContext, - String diagnostics) throws IOException { + String diagnostics) throws Exception { // unmanaged AMs don't use the FINISHING state - RMApp application = null; + RMApp application; if (submissionContext != null && submissionContext.getUnmanagedAM()) { application = testCreateAppRunning(submissionContext); } else { @@ -627,7 +655,7 @@ protected RMApp testCreateAppFinished( } @Test - public void testUnmanagedApp() throws IOException { + public void testUnmanagedApp() throws Exception { ApplicationSubmissionContext subContext = new ApplicationSubmissionContextPBImpl(); subContext.setUnmanagedAM(true); @@ -659,7 +687,7 @@ public void testUnmanagedApp() throws IOException { } @Test - public void testAppSuccessPath() throws IOException { + public void testAppSuccessPath() throws Exception { LOG.info("--- START: testAppSuccessPath ---"); final String diagMsg = "some diagnostics"; RMApp application = testCreateAppFinished(null, diagMsg); @@ -1005,7 +1033,7 @@ public void testAppRunningFailed() throws IOException { } @Test - public void testAppAtFinishingIgnoreKill() throws IOException { + public void testAppAtFinishingIgnoreKill() throws Exception { LOG.info("--- START: testAppAtFinishingIgnoreKill ---"); RMApp application = testCreateAppFinishing(null); @@ -1047,7 +1075,7 @@ public void testAppFinalSavingToFinished() throws IOException { } @Test - public void testAppFinishedFinished() throws IOException { + public void testAppFinishedFinished() throws Exception { LOG.info("--- START: testAppFinishedFinished ---"); RMApp application = testCreateAppFinished(null, ""); @@ -1152,7 +1180,7 @@ public void testAppKilledKilled() throws IOException { } @Test (timeout = 30000) - public void testAppStartAfterKilled() throws IOException { + public void testAppStartAfterKilled() { LOG.info("--- START: testAppStartAfterKilled ---"); ApplicationId applicationId = MockApps.newAppID(appId++); @@ -1162,8 +1190,8 @@ public void testAppStartAfterKilled() throws IOException { @Override protected void onInvalidStateTransition(RMAppEventType rmAppEventType, RMAppState state) { - Assert.assertTrue("RMAppImpl: can't handle " + rmAppEventType - + " at state " + state, false); + Assert.fail("RMAppImpl: can't handle " + rmAppEventType + + " at state " + state); } }; @@ -1199,9 +1227,8 @@ public void testAppsRecoveringStates() throws Exception { } } - public void testRecoverApplication(ApplicationStateData appState, - RMState rmState) - throws Exception { + private void testRecoverApplication(ApplicationStateData appState, + RMState rmState) { ApplicationSubmissionContext submissionContext = appState.getApplicationSubmissionContext(); RMAppImpl application = @@ -1232,7 +1259,7 @@ public void testRecoverApplication(ApplicationStateData appState, verifyRMAppFieldsForFinalTransitions(application); } - public void createRMStateForApplications( + private void createRMStateForApplications( Map applicationState, RMAppState rmAppState) throws IOException { RMApp app = createNewTestApp(null); @@ -1248,15 +1275,67 @@ public void testGetAppReport() throws IOException { RMApp app = createNewTestApp(null); assertAppState(RMAppState.NEW, app); ApplicationReport report = app.createAndGetApplicationReport(null, true); - Assert.assertNotNull(report.getApplicationResourceUsageReport()); + assertNotNull(report.getApplicationResourceUsageReport()); Assert.assertEquals(report.getApplicationResourceUsageReport(),RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT); report = app.createAndGetApplicationReport("clientuser", true); - Assert.assertNotNull(report.getApplicationResourceUsageReport()); + assertNotNull(report.getApplicationResourceUsageReport()); Assert.assertTrue("bad proxy url for app", report.getTrackingUrl().endsWith("/proxy/" + app.getApplicationId() + "/")); } + @Test + public void testFinalTransition() throws Exception { + RMApp application = testCreateAppFinishing(null); + verifyAppBeforeFinishEvent(application); + + RMAppEvent event = new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_FINISHED); + application.handle(event); + rmDispatcher.await(); + + verifyAppAfterFinishEvent(application); + } + + private void verifyAppBeforeFinishEvent(RMApp app) { + assertEquals(0L, ((RMAppImpl) app).getLogAggregationStartTime()); + //RMAppEventType.APP_UPDATE_SAVED sets the finish time + assertTrue("App manager events should not be received!", + appManagerDispatcher.events.isEmpty()); + } + + private void verifyAppAfterFinishEvent(RMApp app) { + assertTrue( + testCaseStartTime < ((RMAppImpl) app).getLogAggregationStartTime()); + assertAppState(RMAppState.FINISHED, app); + verifyAppCompletedEvent(app); + verifyAppRemovedEvent(app); + } + + private void verifyAppCompletedEvent(RMApp app) { + assertEquals(1, appManagerDispatcher.events.size()); + RMAppManagerEvent rmAppManagerEvent = appManagerDispatcher.events.get(0); + assertEquals(RMAppManagerEventType.APP_COMPLETED, + rmAppManagerEvent.getType()); + assertEquals(app.getApplicationId().getId(), + rmAppManagerEvent.getApplicationId().getId()); + } + + private void verifyAppRemovedEvent(RMApp app) { + SchedulerEvent lastEvent = schedulerDispatcher.getLastEvent(); + if (!(lastEvent instanceof AppRemovedSchedulerEvent)) { + fail("Last captured event " + lastEvent + + " should be an instance of " + AppRemovedSchedulerEvent.class + .getCanonicalName()); + } + AppRemovedSchedulerEvent event = + (AppRemovedSchedulerEvent) lastEvent; + assertEquals(SchedulerEventType.APP_REMOVED, event.getType()); + assertEquals(app.getApplicationId().getId(), + event.getApplicationID().getId()); + assertEquals(RMAppState.FINISHED, event.getFinalState()); + } + private void verifyApplicationFinished(RMAppState state) { ArgumentCaptor finalState = ArgumentCaptor.forClass(RMAppState.class); @@ -1269,12 +1348,11 @@ private void verifyApplicationFinished(RMAppState state) { } private void verifyAppRemovedSchedulerEvent(RMAppState finalState) { - Assert.assertEquals(SchedulerEventType.APP_REMOVED, - schedulerDispatcher.lastSchedulerEvent.getType()); - if(schedulerDispatcher.lastSchedulerEvent instanceof - AppRemovedSchedulerEvent) { + SchedulerEvent lastEvent = schedulerDispatcher.getLastEvent(); + Assert.assertEquals(SchedulerEventType.APP_REMOVED, lastEvent.getType()); + if (lastEvent instanceof AppRemovedSchedulerEvent) { AppRemovedSchedulerEvent appRemovedEvent = - (AppRemovedSchedulerEvent) schedulerDispatcher.lastSchedulerEvent; + (AppRemovedSchedulerEvent) lastEvent; Assert.assertEquals(finalState, appRemovedEvent.getFinalState()); } } @@ -1315,4 +1393,30 @@ private void verifyRMAppFieldsForFinalTransitions(RMApp application) { assertEquals(null, application.getApplicationSubmissionContext(). getLogAggregationContext()); } + + private void waitForAppAttemptState(ApplicationId id, + RMAppAttemptState state) + throws InterruptedException { + RMApp app = rmContext.getRMApps().get(id); + Assert.assertNotNull("app shouldn't be null", app); + final int timeoutMsecs = 80 * 1000; + final int waitCycle = 10; + int timeWaiting = 0; + RMAppAttempt appAttempt = app.getCurrentAppAttempt(); + while (appAttempt == null || !state.equals(appAttempt.getState())) { + if (timeWaiting >= timeoutMsecs) { + break; + } + + LOG.info("App : " + id + " State is : " + app.getState() + + " Waiting for state : " + state); + Thread.sleep(waitCycle); + timeWaiting += waitCycle; + } + + LOG.info("App State is : " + app.getState()); + Assert.assertNotNull("App attempt should not be null!", appAttempt); + Assert.assertEquals("App State is not correct (timeout).", state, + appAttempt.getState()); + } }