diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 42e2bcff468..9a770ec49b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -903,7 +903,6 @@ public int getMaxAppAttempts() { @Override public void handle(RMAppEvent event) { - this.writeLock.lock(); try { @@ -1459,8 +1458,7 @@ public void transition(RMAppImpl app, RMAppEvent event) { } } - private static final class AppRejectedTransition extends - FinalTransition{ + private static final class AppRejectedTransition extends FinalTransition { public AppRejectedTransition() { super(RMAppState.FAILED); } @@ -1502,39 +1500,51 @@ static void appAdminClientCleanUp(RMAppImpl app) { private final RMAppState finalState; - public FinalTransition(RMAppState finalState) { + FinalTransition(RMAppState finalState) { this.finalState = finalState; } + @Override public void transition(RMAppImpl app, RMAppEvent event) { app.logAggregationStartTime = System.currentTimeMillis(); + cleanAppInRMNodes(app); + recordFinishTime(app); + removeAppFromScheduler(app); + sendEventToAppManager(app, RMAppManagerEventType.APP_COMPLETED); + handleAppFinishedWithRmContext(app); + app.clearUnusedFields(); + appAdminClientCleanUp(app); + } + + private void cleanAppInRMNodes(RMAppImpl app) { for (NodeId nodeId : app.getRanNodes()) { app.handler.handle( - new RMNodeCleanAppEvent(nodeId, app.applicationId)); + new RMNodeCleanAppEvent(nodeId, app.applicationId)); } + } + + private void recordFinishTime(RMAppImpl app) { app.finishTime = app.storedFinishTime; - if (app.finishTime == 0 ) { + if (app.finishTime == 0) { app.finishTime = app.systemClock.getTime(); } + } + + private void removeAppFromScheduler(RMAppImpl app) { // Recovered apps that are completed were not added to scheduler, so no // need to remove them from scheduler. if (app.recoveredFinalState == null) { app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId, - finalState)); + finalState)); } - app.handler.handle( - new RMAppManagerEvent(app.applicationId, - RMAppManagerEventType.APP_COMPLETED)); + } + private void handleAppFinishedWithRmContext(RMAppImpl app) { app.rmContext.getRMApplicationHistoryWriter() - .applicationFinished(app, finalState); + .applicationFinished(app, finalState); app.rmContext.getSystemMetricsPublisher() - .appFinished(app, finalState, app.finishTime); - // set the memory free - app.clearUnusedFields(); - - appAdminClientCleanUp(app); - }; + .appFinished(app, finalState, app.finishTime); + } } public int getNumFailedAppAttempts() { @@ -1549,7 +1559,7 @@ public int getNumFailedAppAttempts() { return completedAttempts; } - private static final class AttemptFailedTransition implements + private static final class AttemptFailedTransition implements MultipleArcTransition { private final RMAppState initialState; @@ -1812,8 +1822,8 @@ public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { == LogAggregationStatus.TIME_OUT && report.getLogAggregationStatus() == LogAggregationStatus.RUNNING) { - // If the log aggregation status got from latest nm heartbeat - // is Running, and current log aggregation status is TimeOut, + // If the log aggregation status got from latest NM heartbeat + // is RUNNING, and current log aggregation status is TIME_OUT, // based on whether there are any failure messages for this NM, // we will reset the log aggregation status as RUNNING or // RUNNING_WITH_FAILURE @@ -2137,4 +2147,16 @@ protected void onInvalidStateTransition(RMAppEventType rmAppEventType, RMAppState state){ /* TODO fail the application on the failed transition */ } + + private static void sendEventToAppManager(RMAppImpl app, + RMAppManagerEventType event) { + app.handler.handle( + new RMAppManagerEvent(app.applicationId, event)); + } + + @VisibleForTesting + public long getLogAggregationStartTime() { + return logAggregationStartTime; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/AppCreationTestHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/AppCreationTestHelper.java new file mode 100644 index 00000000000..ad93e537f4a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/AppCreationTestHelper.java @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.MockApps; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationContext; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.junit.Assert; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.function.Consumer; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class AppCreationTestHelper { + + private Configuration conf; + private RMContext rmContext; + private YarnScheduler scheduler; + private int appId; + private int maxAppAttempts; + private SystemMetricsPublisher publisher; + private ContainerLaunchContext containerLaunchContext; + private LogAggregationContext logAggregationContext; + private RMStateStore stateStore; + private Consumer appVerifications; + + RMApp createNewTestApp(ApplicationSubmissionContext + submissionContext) throws IOException { + ApplicationId applicationId = MockApps.newAppID(appId++); + String user = MockApps.newUserName(); + String name = MockApps.newAppName(); + String queue = MockApps.newQueue(); + // ensure max application attempts set to known value + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, maxAppAttempts); + + ApplicationMasterService masterService = + new ApplicationMasterService(rmContext, scheduler); + + if (submissionContext == null) { + submissionContext = new ApplicationSubmissionContextPBImpl(); + } + // applicationId will not be used because RMStateStore is mocked, + // but applicationId is still set for safety + submissionContext.setApplicationId(applicationId); + submissionContext.setPriority(Priority.newInstance(0)); + submissionContext.setAMContainerSpec(containerLaunchContext); + submissionContext.setLogAggregationContext(logAggregationContext); + + RMApp application = new RMAppImpl(applicationId, rmContext, conf, name, + user, queue, submissionContext, scheduler, masterService, + System.currentTimeMillis(), "YARN", null, + new ArrayList()); + + testAppStartState(applicationId, user, name, queue, application); + this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(), + application); + return application; + } + + RMApp testCreateAppNewSaving( + ApplicationSubmissionContext submissionContext) throws IOException { + RMApp application = createNewTestApp(submissionContext); + // NEW => NEW_SAVING event RMAppEventType.START + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), RMAppEventType.START); + application.handle(event); + assertStartTimeSet(application); + assertAppState(RMAppState.NEW_SAVING, application); + // verify sendATSCreateEvent() is not get called during + // RMAppNewlySavingTransition. + verify(publisher, times(0)).appCreated(eq(application), anyLong()); + return application; + } + + RMApp testCreateAppSubmittedNoRecovery( + ApplicationSubmissionContext submissionContext) throws IOException { + RMApp application = testCreateAppNewSaving(submissionContext); + // NEW_SAVING => SUBMITTED event RMAppEventType.APP_NEW_SAVED + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), + RMAppEventType.APP_NEW_SAVED); + application.handle(event); + assertStartTimeSet(application); + assertAppState(RMAppState.SUBMITTED, application); + // verify sendATSCreateEvent() is get called during + // AddApplicationToSchedulerTransition. + verify(publisher).appCreated(eq(application), anyLong()); + appVerifications.accept(application); + return application; + } + + RMApp testCreateAppSubmittedRecovery( + ApplicationSubmissionContext submissionContext) throws IOException { + RMApp application = createNewTestApp(submissionContext); + // NEW => SUBMITTED event RMAppEventType.RECOVER + RMStateStore.RMState state = new RMStateStore.RMState(); + ApplicationStateData appState = + ApplicationStateData.newInstance(123, 123, null, "user", null); + state.getApplicationState().put(application.getApplicationId(), appState); + RMAppEvent event = + new RMAppRecoverEvent(application.getApplicationId(), state); + + application.handle(event); + assertStartTimeSet(application); + assertAppState(RMAppState.SUBMITTED, application); + appVerifications.accept(application); + return application; + } + + RMApp testCreateAppAccepted( + ApplicationSubmissionContext submissionContext) throws IOException { + RMApp application = testCreateAppSubmittedNoRecovery(submissionContext); + // SUBMITTED => ACCEPTED event RMAppEventType.APP_ACCEPTED + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), + RMAppEventType.APP_ACCEPTED); + application.handle(event); + assertStartTimeSet(application); + assertAppState(RMAppState.ACCEPTED, application); + return application; + } + + RMApp testCreateAppRunning( + ApplicationSubmissionContext submissionContext) throws IOException { + RMApp application = testCreateAppAccepted(submissionContext); + // ACCEPTED => RUNNING event RMAppEventType.ATTEMPT_REGISTERED + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_REGISTERED); + application.handle(event); + assertStartTimeSet(application); + assertAppState(RMAppState.RUNNING, application); + assertFinalAppStatus(FinalApplicationStatus.UNDEFINED, application); + return application; + } + + RMApp testCreateAppFinalSaving( + ApplicationSubmissionContext submissionContext) throws IOException { + RMApp application = testCreateAppRunning(submissionContext); + RMAppEvent finishingEvent = + new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_UNREGISTERED); + application.handle(finishingEvent); + assertAppState(RMAppState.FINAL_SAVING, application); + assertAppFinalStateSaved(); + return application; + } + + RMApp testCreateAppFinishing( + ApplicationSubmissionContext submissionContext) throws IOException { + // unmanaged AMs don't use the FINISHING state + assert submissionContext == null || !submissionContext.getUnmanagedAM(); + RMApp application = testCreateAppFinalSaving(submissionContext); + // FINAL_SAVING => FINISHING event RMAppEventType.APP_UPDATED + RMAppEvent appUpdated = new RMAppEvent(application.getApplicationId(), + RMAppEventType.APP_UPDATE_SAVED); + application.handle(appUpdated); + assertAppState(RMAppState.FINISHING, application); + assertTimesAtFinish(application); + return application; + } + + RMApp testCreateAppFinished( + ApplicationSubmissionContext submissionContext, + String diagnostics) throws IOException { + // unmanaged AMs don't use the FINISHING state + RMApp application; + if (submissionContext != null && submissionContext.getUnmanagedAM()) { + application = testCreateAppRunning(submissionContext); + } else { + application = testCreateAppFinishing(submissionContext); + } + // RUNNING/FINISHING => FINISHED event RMAppEventType.ATTEMPT_FINISHED + RMAppEvent finishedEvent = new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_FINISHED, diagnostics); + application.handle(finishedEvent); + assertAppState(RMAppState.FINISHED, application); + assertTimesAtFinish(application); + // finished without a proper unregister implies failed + assertFinalAppStatus(FinalApplicationStatus.FAILED, application); + Assert.assertTrue("Finished app missing diagnostics", + application.getDiagnostics().indexOf(diagnostics) != -1); + return application; + } + + static void assertAppState(RMAppState state, RMApp application) { + Assert.assertEquals("application state should have been " + state, + state, application.getState()); + } + + static void assertFinalAppStatus(FinalApplicationStatus status, RMApp + application) { + Assert.assertEquals("Final application status should have been " + status, + status, application.getFinalApplicationStatus()); + } + + // test to make sure times are set when app finishes + static void assertTimesAtFinish(RMApp application) { + assertStartTimeSet(application); + Assert.assertTrue("application finish time is not greater than 0", + (application.getFinishTime() > 0)); + Assert.assertTrue("application finish time is not >= start time", + (application.getFinishTime() >= application.getStartTime())); + } + + // test to make sure times are set when app finishes + private static void assertStartTimeSet(RMApp application) { + Assert.assertTrue("application start time is not greater than 0", + application.getStartTime() > 0); + Assert.assertTrue("application start time is before currentTime", + application.getStartTime() <= System.currentTimeMillis()); + } + + static void assertKilled(RMApp application) { + assertTimesAtFinish(application); + assertAppState(RMAppState.KILLED, application); + assertFinalAppStatus(FinalApplicationStatus.KILLED, application); + StringBuilder diag = application.getDiagnostics(); + Assert.assertEquals("application diagnostics is not correct", + "Application killed by user.", diag.toString()); + } + + static void assertFailed(RMApp application, String regex) { + assertTimesAtFinish(application); + assertAppState(RMAppState.FAILED, application); + assertFinalAppStatus(FinalApplicationStatus.FAILED, application); + StringBuilder diag = application.getDiagnostics(); + Assert.assertTrue("application diagnostics is not correct", + diag.toString().matches(regex)); + } + + void assertAppFinalStateSaved(){ + verify(stateStore, times(1)).updateApplicationState( + any(ApplicationStateData.class)); + } + + void assertAppFinalStateNotSaved() { + verify(stateStore, times(0)).updateApplicationState( + any(ApplicationStateData.class)); + } + + // Test expected newly created app state + private static void testAppStartState(ApplicationId applicationId, + String user, String name, String queue, RMApp application) { + Assert.assertTrue("application start time is not greater than 0", + application.getStartTime() > 0); + Assert.assertTrue("application start time is before currentTime", + application.getStartTime() <= System.currentTimeMillis()); + Assert.assertEquals("application user is not correct", + user, application.getUser()); + Assert.assertEquals("application id is not correct", + applicationId, application.getApplicationId()); + Assert.assertEquals("application progress is not correct", + (float)0.0, application.getProgress(), (float)0.0); + Assert.assertEquals("application queue is not correct", + queue, application.getQueue()); + Assert.assertEquals("application name is not correct", + name, application.getName()); + Assert.assertEquals("application finish time is not 0 and should be", + 0, application.getFinishTime()); + Assert.assertNull("application tracking url is not correct", + application.getTrackingUrl()); + StringBuilder diag = application.getDiagnostics(); + Assert.assertEquals("application diagnostics is not correct", + 0, diag.length()); + } + + public int getMaxAppAttempts() { + return maxAppAttempts; + } + + public int getAppId() { + return appId; + } + + public int increaseAppId() { + appId++; + return appId; + } + + public YarnScheduler getScheduler() { + return scheduler; + } + + public static final class Builder { + private int appId = 1; + private int maxAppAttempts = + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS; + private Configuration conf; + private RMContext rmContext; + private YarnScheduler scheduler; + private SystemMetricsPublisher publisher; + private ContainerLaunchContext containerLaunchContext; + private LogAggregationContext logAggregationContext; + private RMStateStore stateStore; + private Consumer appVerifications; + + private Builder() { + } + + public static Builder create() { + return new Builder(); + } + + public Builder withConf(Configuration conf) { + this.conf = conf; + return this; + } + + public Builder withRmContext(RMContext rmContext) { + this.rmContext = rmContext; + return this; + } + + public Builder withScheduler(YarnScheduler scheduler) { + this.scheduler = scheduler; + return this; + } + + public Builder withAppId(int appId) { + this.appId = appId; + return this; + } + + public Builder withMaxAppAttempts(int maxAppAttempts) { + this.maxAppAttempts = maxAppAttempts; + return this; + } + + public Builder withPublisher(SystemMetricsPublisher publisher) { + this.publisher = publisher; + return this; + } + + public Builder withContainerLaunchContext(ContainerLaunchContext + containerLaunchContext) { + this.containerLaunchContext = containerLaunchContext; + return this; + } + + public Builder withLogAggregationContext(LogAggregationContext + logAggregationContext) { + this.logAggregationContext = logAggregationContext; + return this; + } + + public Builder withRMStateStore(RMStateStore stateStore) { + this.stateStore = stateStore; + return this; + } + + public Builder withDefaultAppVerifications(Consumer verifications) { + this.appVerifications = verifications; + return this; + } + + public AppCreationTestHelper build() { + AppCreationTestHelper appCreationTestHelper = new AppCreationTestHelper(); + appCreationTestHelper.maxAppAttempts = this.maxAppAttempts; + appCreationTestHelper.appId = this.appId; + appCreationTestHelper.conf = this.conf; + appCreationTestHelper.containerLaunchContext = this + .containerLaunchContext; + appCreationTestHelper.publisher = this.publisher; + appCreationTestHelper.scheduler = this.scheduler; + appCreationTestHelper.logAggregationContext = this.logAggregationContext; + appCreationTestHelper.rmContext = this.rmContext; + appCreationTestHelper.stateStore = this.stateStore; + appCreationTestHelper.appVerifications = this.appVerifications; + return appCreationTestHelper; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 70887e0f4bf..18ef7f98167 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -18,28 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; - +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -54,7 +33,6 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -66,14 +44,12 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LogAggregationContext; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -111,6 +87,34 @@ import org.mockito.ArgumentCaptor; import org.mockito.Matchers; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import static junit.framework.TestCase.assertTrue; +import static org.apache.hadoop.yarn.server.resourcemanager.rmapp.AppCreationTestHelper.assertAppState; +import static org.apache.hadoop.yarn.server.resourcemanager.rmapp.AppCreationTestHelper.assertFailed; +import static org.apache.hadoop.yarn.server.resourcemanager.rmapp.AppCreationTestHelper.assertFinalAppStatus; +import static org.apache.hadoop.yarn.server.resourcemanager.rmapp.AppCreationTestHelper.assertKilled; +import static org.apache.hadoop.yarn.server.resourcemanager.rmapp.AppCreationTestHelper.assertTimesAtFinish; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + @RunWith(value = Parameterized.class) public class TestRMAppTransitions { @@ -119,22 +123,22 @@ private boolean isSecurityEnabled; private Configuration conf; private RMContext rmContext; - private static int maxAppAttempts = - YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS; - private static int appId = 1; private DrainDispatcher rmDispatcher; private RMStateStore store; private RMApplicationHistoryWriter writer; private SystemMetricsPublisher publisher; - private YarnScheduler scheduler; + private TestSchedulerEventDispatcher schedulerDispatcher; + private TestApplicationManagerEventDispatcher appManagerDispatcher; + private AppCreationTestHelper appCreationTestHelper; + private long testCaseStartTime; // ignore all the RM application attempt events private static final class TestApplicationAttemptEventDispatcher implements EventHandler { private final RMContext rmContext; - public TestApplicationAttemptEventDispatcher(RMContext rmContext) { + TestApplicationAttemptEventDispatcher(RMContext rmContext) { this.rmContext = rmContext; } @@ -158,7 +162,7 @@ public void handle(RMAppAttemptEvent event) { EventHandler { private final RMContext rmContext; - public TestApplicationEventDispatcher(RMContext rmContext) { + TestApplicationEventDispatcher(RMContext rmContext) { this.rmContext = rmContext; } @@ -181,15 +185,18 @@ public void handle(RMAppEvent event) { // ResourceManager.java private static final class TestApplicationManagerEventDispatcher implements EventHandler { + List events = Lists.newArrayList(); @Override public void handle(RMAppManagerEvent event) { + LOG.info("Handling app manager event: " + event); + events.add(event); } } // handle all the scheduler events - same as in ResourceManager.java private static final class TestSchedulerEventDispatcher implements EventHandler { - public SchedulerEvent lastSchedulerEvent; + SchedulerEvent lastSchedulerEvent; @Override public void handle(SchedulerEvent event) { @@ -243,7 +250,7 @@ public void setUp() throws Exception { ResourceScheduler resourceScheduler = mock(ResourceScheduler.class); doReturn(null).when(resourceScheduler) - .getAppResourceUsageReport((ApplicationAttemptId)Matchers.any()); + .getAppResourceUsageReport(Matchers.any()); doReturn(resourceScheduler).when(rmContext).getScheduler(); doReturn(mock(RMTimelineCollectorManager.class)).when(rmContext) @@ -254,9 +261,11 @@ public void setUp() throws Exception { rmDispatcher.register(RMAppEventType.class, new TestApplicationEventDispatcher(rmContext)); - + + appManagerDispatcher = new + TestApplicationManagerEventDispatcher(); rmDispatcher.register(RMAppManagerEventType.class, - new TestApplicationManagerEventDispatcher()); + appManagerDispatcher); schedulerDispatcher = new TestSchedulerEventDispatcher(); rmDispatcher.register(SchedulerEventType.class, @@ -264,15 +273,30 @@ public void setUp() throws Exception { rmDispatcher.init(conf); rmDispatcher.start(); + + appCreationTestHelper = + AppCreationTestHelper.Builder.create() + .withAppId(1) + .withConf(conf) + .withContainerLaunchContext(prepareContainerLaunchContext()) + .withLogAggregationContext(getLogAggregationContext()) + .withMaxAppAttempts( + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS) + .withRmContext(rmContext) + .withScheduler(mock(YarnScheduler.class)) + .withPublisher(publisher) + .withRMStateStore(store) + .withDefaultAppVerifications(this + ::verifyRMAppFieldsForNonFinalTransitions) + .build(); + testCaseStartTime = System.currentTimeMillis(); } private ByteBuffer getTokens() throws IOException { Credentials ts = new Credentials(); DataOutputBuffer dob = new DataOutputBuffer(); ts.writeTokenStorageToStream(dob); - ByteBuffer securityTokens = - ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); - return securityTokens; + return ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); } private ByteBuffer getTokensConf() throws IOException { @@ -286,9 +310,7 @@ private ByteBuffer getTokensConf() throws IOException { appConf.set("dfs.namenode.rpc-address.mycluster3.nn2", "123.0.0.2"); appConf.write(dob); - ByteBuffer tokenConf = - ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); - return tokenConf; + return ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); } private Map getLocalResources() @@ -308,13 +330,13 @@ private ByteBuffer getTokensConf() throws IOException { localRes.setTimestamp(scriptFile.lastModified()); String destinationFile = "dest_file"; Map localResources = - new HashMap(); + new HashMap<>(); localResources.put(destinationFile, localRes); return localResources; } private Map getEnvironment() { - Map userSetEnv = new HashMap(); + Map userSetEnv = new HashMap<>(); userSetEnv.put(Environment.CONTAINER_ID.name(), "user_set_container_id"); userSetEnv.put(Environment.NM_HOST.name(), "user_set_NM_HOST"); userSetEnv.put(Environment.NM_PORT.name(), "user_set_NM_PORT"); @@ -329,15 +351,14 @@ private ByteBuffer getTokensConf() throws IOException { } private ContainerRetryContext getContainerRetryContext() { - ContainerRetryContext containerRetryContext = ContainerRetryContext + return ContainerRetryContext .newInstance( ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES, - new HashSet<>(Arrays.asList(Integer.valueOf(111))), 0, 0); - return containerRetryContext; + new HashSet<>(Arrays.asList(111)), 0, 0); } private Map getServiceData() { - Map serviceData = new HashMap(); + Map serviceData = new HashMap<>(); String serviceName = "non_exist_auxService"; serviceData.put(serviceName, ByteBuffer.wrap(serviceName.getBytes())); return serviceData; @@ -361,129 +382,12 @@ private ContainerLaunchContext prepareContainerLaunchContext() } private LogAggregationContext getLogAggregationContext() { - LogAggregationContext logAggregationContext = - LogAggregationContext.newInstance( - "includePattern", "excludePattern", - "rolledLogsIncludePattern", - "rolledLogsExcludePattern", - "policyClass", - "policyParameters"); - return logAggregationContext; - } - - protected RMApp createNewTestApp(ApplicationSubmissionContext - submissionContext) throws IOException { - ApplicationId applicationId = MockApps.newAppID(appId++); - String user = MockApps.newUserName(); - String name = MockApps.newAppName(); - String queue = MockApps.newQueue(); - // ensure max application attempts set to known value - conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, maxAppAttempts); - scheduler = mock(YarnScheduler.class); - - ApplicationMasterService masterService = - new ApplicationMasterService(rmContext, scheduler); - - if(submissionContext == null) { - submissionContext = new ApplicationSubmissionContextPBImpl(); - } - // applicationId will not be used because RMStateStore is mocked, - // but applicationId is still set for safety - submissionContext.setApplicationId(applicationId); - submissionContext.setPriority(Priority.newInstance(0)); - submissionContext.setAMContainerSpec(prepareContainerLaunchContext()); - submissionContext.setLogAggregationContext(getLogAggregationContext()); - - RMApp application = new RMAppImpl(applicationId, rmContext, conf, name, - user, queue, submissionContext, scheduler, masterService, - System.currentTimeMillis(), "YARN", null, - new ArrayList()); - - testAppStartState(applicationId, user, name, queue, application); - this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(), - application); - return application; - } - - // Test expected newly created app state - private static void testAppStartState(ApplicationId applicationId, - String user, String name, String queue, RMApp application) { - Assert.assertTrue("application start time is not greater than 0", - application.getStartTime() > 0); - Assert.assertTrue("application start time is before currentTime", - application.getStartTime() <= System.currentTimeMillis()); - Assert.assertEquals("application user is not correct", - user, application.getUser()); - Assert.assertEquals("application id is not correct", - applicationId, application.getApplicationId()); - Assert.assertEquals("application progress is not correct", - (float)0.0, application.getProgress(), (float)0.0); - Assert.assertEquals("application queue is not correct", - queue, application.getQueue()); - Assert.assertEquals("application name is not correct", - name, application.getName()); - Assert.assertEquals("application finish time is not 0 and should be", - 0, application.getFinishTime()); - Assert.assertEquals("application tracking url is not correct", - null, application.getTrackingUrl()); - StringBuilder diag = application.getDiagnostics(); - Assert.assertEquals("application diagnostics is not correct", - 0, diag.length()); - } - - // test to make sure times are set when app finishes - private static void assertStartTimeSet(RMApp application) { - Assert.assertTrue("application start time is not greater than 0", - application.getStartTime() > 0); - Assert.assertTrue("application start time is before currentTime", - application.getStartTime() <= System.currentTimeMillis()); - } - - private static void assertAppState(RMAppState state, RMApp application) { - Assert.assertEquals("application state should have been " + state, - state, application.getState()); - } - - private static void assertFinalAppStatus(FinalApplicationStatus status, RMApp application) { - Assert.assertEquals("Final application status should have been " + status, - status, application.getFinalApplicationStatus()); - } - - // test to make sure times are set when app finishes - private void assertTimesAtFinish(RMApp application) { - assertStartTimeSet(application); - Assert.assertTrue("application finish time is not greater than 0", - (application.getFinishTime() > 0)); - Assert.assertTrue("application finish time is not >= start time", - (application.getFinishTime() >= application.getStartTime())); - } - - private void assertAppFinalStateSaved(RMApp application){ - verify(store, times(1)).updateApplicationState( - any(ApplicationStateData.class)); - } - - private void assertAppFinalStateNotSaved(RMApp application){ - verify(store, times(0)).updateApplicationState( - any(ApplicationStateData.class)); - } - - private void assertKilled(RMApp application) { - assertTimesAtFinish(application); - assertAppState(RMAppState.KILLED, application); - assertFinalAppStatus(FinalApplicationStatus.KILLED, application); - StringBuilder diag = application.getDiagnostics(); - Assert.assertEquals("application diagnostics is not correct", - "Application killed by user.", diag.toString()); - } - - private void assertFailed(RMApp application, String regex) { - assertTimesAtFinish(application); - assertAppState(RMAppState.FAILED, application); - assertFinalAppStatus(FinalApplicationStatus.FAILED, application); - StringBuilder diag = application.getDiagnostics(); - Assert.assertTrue("application diagnostics is not correct", - diag.toString().matches(regex)); + return LogAggregationContext.newInstance( + "includePattern", "excludePattern", + "rolledLogsIncludePattern", + "rolledLogsExcludePattern", + "policyClass", + "policyParameters"); } private void sendAppUpdateSavedEvent(RMApp application) { @@ -495,146 +399,23 @@ private void sendAppUpdateSavedEvent(RMApp application) { private void sendAttemptUpdateSavedEvent(RMApp application) { application.getCurrentAppAttempt().handle( - new RMAppAttemptEvent(application.getCurrentAppAttempt().getAppAttemptId(), + new RMAppAttemptEvent( + application.getCurrentAppAttempt().getAppAttemptId(), RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED)); rmDispatcher.await(); } - protected RMApp testCreateAppNewSaving( - ApplicationSubmissionContext submissionContext) throws IOException { - RMApp application = createNewTestApp(submissionContext); - // NEW => NEW_SAVING event RMAppEventType.START - RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.START); - application.handle(event); - assertStartTimeSet(application); - assertAppState(RMAppState.NEW_SAVING, application); - // verify sendATSCreateEvent() is not get called during - // RMAppNewlySavingTransition. - verify(publisher, times(0)).appCreated(eq(application), anyLong()); - return application; - } - - protected RMApp testCreateAppSubmittedNoRecovery( - ApplicationSubmissionContext submissionContext) throws IOException { - RMApp application = testCreateAppNewSaving(submissionContext); - // NEW_SAVING => SUBMITTED event RMAppEventType.APP_NEW_SAVED - RMAppEvent event = - new RMAppEvent(application.getApplicationId(), - RMAppEventType.APP_NEW_SAVED); - application.handle(event); - assertStartTimeSet(application); - assertAppState(RMAppState.SUBMITTED, application); - // verify sendATSCreateEvent() is get called during - // AddApplicationToSchedulerTransition. - verify(publisher).appCreated(eq(application), anyLong()); - verifyRMAppFieldsForNonFinalTransitions(application); - return application; - } - - protected RMApp testCreateAppSubmittedRecovery( - ApplicationSubmissionContext submissionContext) throws IOException { - RMApp application = createNewTestApp(submissionContext); - // NEW => SUBMITTED event RMAppEventType.RECOVER - RMState state = new RMState(); - ApplicationStateData appState = - ApplicationStateData.newInstance(123, 123, null, "user", null); - state.getApplicationState().put(application.getApplicationId(), appState); - RMAppEvent event = - new RMAppRecoverEvent(application.getApplicationId(), state); - - application.handle(event); - assertStartTimeSet(application); - assertAppState(RMAppState.SUBMITTED, application); - verifyRMAppFieldsForNonFinalTransitions(application); - return application; - } - - protected RMApp testCreateAppAccepted( - ApplicationSubmissionContext submissionContext) throws IOException { - RMApp application = testCreateAppSubmittedNoRecovery(submissionContext); - // SUBMITTED => ACCEPTED event RMAppEventType.APP_ACCEPTED - RMAppEvent event = - new RMAppEvent(application.getApplicationId(), - RMAppEventType.APP_ACCEPTED); - application.handle(event); - assertStartTimeSet(application); - assertAppState(RMAppState.ACCEPTED, application); - return application; - } - - protected RMApp testCreateAppRunning( - ApplicationSubmissionContext submissionContext) throws IOException { - RMApp application = testCreateAppAccepted(submissionContext); - // ACCEPTED => RUNNING event RMAppEventType.ATTEMPT_REGISTERED - RMAppEvent event = - new RMAppEvent(application.getApplicationId(), - RMAppEventType.ATTEMPT_REGISTERED); - application.handle(event); - assertStartTimeSet(application); - assertAppState(RMAppState.RUNNING, application); - assertFinalAppStatus(FinalApplicationStatus.UNDEFINED, application); - return application; - } - - protected RMApp testCreateAppFinalSaving( - ApplicationSubmissionContext submissionContext) throws IOException { - RMApp application = testCreateAppRunning(submissionContext); - RMAppEvent finishingEvent = - new RMAppEvent(application.getApplicationId(), - RMAppEventType.ATTEMPT_UNREGISTERED); - application.handle(finishingEvent); - assertAppState(RMAppState.FINAL_SAVING, application); - assertAppFinalStateSaved(application); - return application; - } - - protected RMApp testCreateAppFinishing( - ApplicationSubmissionContext submissionContext) throws IOException { - // unmanaged AMs don't use the FINISHING state - assert submissionContext == null || !submissionContext.getUnmanagedAM(); - RMApp application = testCreateAppFinalSaving(submissionContext); - // FINAL_SAVING => FINISHING event RMAppEventType.APP_UPDATED - RMAppEvent appUpdated = - new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_UPDATE_SAVED); - application.handle(appUpdated); - assertAppState(RMAppState.FINISHING, application); - assertTimesAtFinish(application); - return application; - } - - protected RMApp testCreateAppFinished( - ApplicationSubmissionContext submissionContext, - String diagnostics) throws IOException { - // unmanaged AMs don't use the FINISHING state - RMApp application = null; - if (submissionContext != null && submissionContext.getUnmanagedAM()) { - application = testCreateAppRunning(submissionContext); - } else { - application = testCreateAppFinishing(submissionContext); - } - // RUNNING/FINISHING => FINISHED event RMAppEventType.ATTEMPT_FINISHED - RMAppEvent finishedEvent = new RMAppEvent(application.getApplicationId(), - RMAppEventType.ATTEMPT_FINISHED, diagnostics); - application.handle(finishedEvent); - assertAppState(RMAppState.FINISHED, application); - assertTimesAtFinish(application); - // finished without a proper unregister implies failed - assertFinalAppStatus(FinalApplicationStatus.FAILED, application); - Assert.assertTrue("Finished app missing diagnostics", - application.getDiagnostics().indexOf(diagnostics) != -1); - return application; - } - @Test public void testUnmanagedApp() throws IOException { - ApplicationSubmissionContext subContext = new ApplicationSubmissionContextPBImpl(); + ApplicationSubmissionContext subContext = + new ApplicationSubmissionContextPBImpl(); subContext.setUnmanagedAM(true); // test success path LOG.info("--- START: testUnmanagedAppSuccessPath ---"); final String diagMsg = "some diagnostics"; - RMApp application = testCreateAppFinished(subContext, diagMsg); + RMApp application = + appCreationTestHelper.testCreateAppFinished(subContext, diagMsg); Assert.assertTrue("Finished app missing diagnostics", application.getDiagnostics().indexOf(diagMsg) != -1); @@ -644,9 +425,10 @@ public void testUnmanagedApp() throws IOException { // test app fails after 1 app attempt failure LOG.info("--- START: testUnmanagedAppFailPath ---"); - application = testCreateAppRunning(subContext); + application = appCreationTestHelper.testCreateAppRunning(subContext); RMAppEvent event = new RMAppFailedAttemptEvent( - application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED, "", false); + application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED, "", + false); application.handle(event); rmDispatcher.await(); RMAppAttempt appAttempt = application.getCurrentAppAttempt(); @@ -654,7 +436,7 @@ public void testUnmanagedApp() throws IOException { sendAppUpdateSavedEvent(application); assertFailed(application, ".*Unmanaged application.*Failing the application.*"); - assertAppFinalStateSaved(application); + appCreationTestHelper.assertAppFinalStateSaved(); verifyRMAppFieldsForFinalTransitions(application); } @@ -662,7 +444,8 @@ public void testUnmanagedApp() throws IOException { public void testAppSuccessPath() throws IOException { LOG.info("--- START: testAppSuccessPath ---"); final String diagMsg = "some diagnostics"; - RMApp application = testCreateAppFinished(null, diagMsg); + RMApp application = + appCreationTestHelper.testCreateAppFinished(null, diagMsg); Assert.assertTrue("Finished application missing diagnostics", application.getDiagnostics().indexOf(diagMsg) != -1); verifyRMAppFieldsForFinalTransitions(application); @@ -674,7 +457,7 @@ public void testAppRecoverPath() throws IOException { ApplicationSubmissionContext sub = Records.newRecord(ApplicationSubmissionContext.class); sub.setAMContainerSpec(prepareContainerLaunchContext()); - testCreateAppSubmittedRecovery(sub); + appCreationTestHelper.testCreateAppSubmittedRecovery(sub); } @Test (timeout = 30000) @@ -684,7 +467,7 @@ public void testAppNewKill() throws IOException { UserGroupInformation fooUser = UserGroupInformation.createUserForTesting( "fooTestAppNewKill", new String[] {"foo_group"}); - RMApp application = createNewTestApp(null); + RMApp application = appCreationTestHelper.createNewTestApp(null); // NEW => KILLED event RMAppEventType.KILL RMAppEvent event = new RMAppKillByClientEvent( application.getApplicationId(), "Application killed by user.", fooUser, @@ -693,7 +476,7 @@ public void testAppNewKill() throws IOException { rmDispatcher.await(); sendAppUpdateSavedEvent(application); assertKilled(application); - assertAppFinalStateNotSaved(application); + appCreationTestHelper.assertAppFinalStateNotSaved(); verifyApplicationFinished(RMAppState.KILLED); verifyAppRemovedSchedulerEvent(RMAppState.KILLED); verifyRMAppFieldsForFinalTransitions(application); @@ -703,7 +486,7 @@ public void testAppNewKill() throws IOException { public void testAppNewReject() throws IOException { LOG.info("--- START: testAppNewReject ---"); - RMApp application = createNewTestApp(null); + RMApp application = appCreationTestHelper.createNewTestApp(null); // NEW => FAILED event RMAppEventType.APP_REJECTED String rejectedText = "Test Application Rejected"; RMAppEvent event = new RMAppEvent(application.getApplicationId(), @@ -712,7 +495,7 @@ public void testAppNewReject() throws IOException { rmDispatcher.await(); sendAppUpdateSavedEvent(application); assertFailed(application, rejectedText); - assertAppFinalStateSaved(application); + appCreationTestHelper.assertAppFinalStateSaved(); verifyApplicationFinished(RMAppState.FAILED); verifyRMAppFieldsForFinalTransitions(application); } @@ -721,7 +504,7 @@ public void testAppNewReject() throws IOException { public void testAppNewRejectAddToStore() throws IOException { LOG.info("--- START: testAppNewRejectAddToStore ---"); - RMApp application = createNewTestApp(null); + RMApp application = appCreationTestHelper.createNewTestApp(null); // NEW => FAILED event RMAppEventType.APP_REJECTED String rejectedText = "Test Application Rejected"; RMAppEvent event = new RMAppEvent(application.getApplicationId(), @@ -730,7 +513,7 @@ public void testAppNewRejectAddToStore() throws IOException { rmDispatcher.await(); sendAppUpdateSavedEvent(application); assertFailed(application, rejectedText); - assertAppFinalStateSaved(application); + appCreationTestHelper.assertAppFinalStateSaved(); verifyApplicationFinished(RMAppState.FAILED); verifyRMAppFieldsForFinalTransitions(application); rmContext.getStateStore().removeApplication(application); @@ -740,7 +523,7 @@ public void testAppNewRejectAddToStore() throws IOException { public void testAppNewSavingKill() throws IOException { LOG.info("--- START: testAppNewSavingKill ---"); - RMApp application = testCreateAppNewSaving(null); + RMApp application = appCreationTestHelper.testCreateAppNewSaving(null); // NEW_SAVING => KILLED event RMAppEventType.KILL UserGroupInformation fooUser = UserGroupInformation.createUserForTesting( "fooTestAppNewSavingKill", new String[] {"foo_group"}); @@ -762,7 +545,7 @@ public void testAppNewSavingKill() throws IOException { public void testAppNewSavingReject() throws IOException { LOG.info("--- START: testAppNewSavingReject ---"); - RMApp application = testCreateAppNewSaving(null); + RMApp application = appCreationTestHelper.testCreateAppNewSaving(null); // NEW_SAVING => FAILED event RMAppEventType.APP_REJECTED String rejectedText = "Test Application Rejected"; RMAppEvent event = new RMAppEvent(application.getApplicationId(), @@ -771,7 +554,7 @@ public void testAppNewSavingReject() throws IOException { rmDispatcher.await(); sendAppUpdateSavedEvent(application); assertFailed(application, rejectedText); - assertAppFinalStateSaved(application); + appCreationTestHelper.assertAppFinalStateSaved(); verifyApplicationFinished(RMAppState.FAILED); verifyRMAppFieldsForFinalTransitions(application); } @@ -779,7 +562,7 @@ public void testAppNewSavingReject() throws IOException { @Test (timeout = 30000) public void testAppNewSavingSaveReject() throws IOException { LOG.info("--- START: testAppNewSavingSaveReject ---"); - RMApp application = testCreateAppNewSaving(null); + RMApp application = appCreationTestHelper.testCreateAppNewSaving(null); // NEW_SAVING => FAILED event RMAppEventType.APP_SAVE_FAILED String rejectedText = "Test Application Rejected"; RMAppEvent event = new RMAppEvent(application.getApplicationId(), @@ -797,7 +580,8 @@ public void testAppNewSavingSaveReject() throws IOException { public void testAppSubmittedRejected() throws IOException { LOG.info("--- START: testAppSubmittedRejected ---"); - RMApp application = testCreateAppSubmittedNoRecovery(null); + RMApp application = + appCreationTestHelper.testCreateAppSubmittedNoRecovery(null); // SUBMITTED => FAILED event RMAppEventType.APP_REJECTED String rejectedText = "app rejected"; RMAppEvent event = new RMAppEvent(application.getApplicationId(), @@ -806,15 +590,16 @@ public void testAppSubmittedRejected() throws IOException { rmDispatcher.await(); sendAppUpdateSavedEvent(application); assertFailed(application, rejectedText); - assertAppFinalStateSaved(application); + appCreationTestHelper.assertAppFinalStateSaved(); verifyApplicationFinished(RMAppState.FAILED); verifyRMAppFieldsForFinalTransitions(application); } @Test - public void testAppSubmittedKill() throws IOException, InterruptedException { + public void testAppSubmittedKill() throws IOException { LOG.info("--- START: testAppSubmittedKill---"); - RMApp application = testCreateAppSubmittedNoRecovery(null); + RMApp application = + appCreationTestHelper.testCreateAppSubmittedNoRecovery(null); UserGroupInformation fooUser = UserGroupInformation.createUserForTesting( "fooTestAppSubmittedKill", new String[] {"foo_group"}); @@ -828,7 +613,7 @@ public void testAppSubmittedKill() throws IOException, InterruptedException { rmDispatcher.await(); sendAppUpdateSavedEvent(application); assertKilled(application); - assertAppFinalStateSaved(application); + appCreationTestHelper.assertAppFinalStateSaved(); verifyApplicationFinished(RMAppState.KILLED); verifyAppRemovedSchedulerEvent(RMAppState.KILLED); verifyRMAppFieldsForFinalTransitions(application); @@ -838,10 +623,10 @@ public void testAppSubmittedKill() throws IOException, InterruptedException { public void testAppAcceptedFailed() throws IOException { LOG.info("--- START: testAppAcceptedFailed ---"); - RMApp application = testCreateAppAccepted(null); + RMApp application = appCreationTestHelper.testCreateAppAccepted(null); // ACCEPTED => ACCEPTED event RMAppEventType.RMAppEventType.ATTEMPT_FAILED - Assert.assertTrue(maxAppAttempts > 1); - for (int i=1; i < maxAppAttempts; i++) { + Assert.assertTrue(appCreationTestHelper.getMaxAppAttempts() > 1); + for (int i = 1; i < appCreationTestHelper.getMaxAppAttempts(); i++) { RMAppEvent event = new RMAppFailedAttemptEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED, "", false); @@ -865,14 +650,14 @@ public void testAppAcceptedFailed() throws IOException { rmDispatcher.await(); sendAppUpdateSavedEvent(application); assertFailed(application, ".*" + message + ".*Failing the application.*"); - assertAppFinalStateSaved(application); + appCreationTestHelper.assertAppFinalStateSaved(); verifyApplicationFinished(RMAppState.FAILED); } @Test - public void testAppAcceptedKill() throws IOException, InterruptedException { + public void testAppAcceptedKill() throws IOException { LOG.info("--- START: testAppAcceptedKill ---"); - RMApp application = testCreateAppAccepted(null); + RMApp application = appCreationTestHelper.testCreateAppAccepted(null); // ACCEPTED => KILLED event RMAppEventType.KILL UserGroupInformation fooUser = UserGroupInformation.createUserForTesting( "fooTestAppAcceptedKill", new String[] {"foo_group"}); @@ -892,17 +677,16 @@ public void testAppAcceptedKill() throws IOException, InterruptedException { assertAppState(RMAppState.FINAL_SAVING, application); sendAppUpdateSavedEvent(application); assertKilled(application); - assertAppFinalStateSaved(application); + appCreationTestHelper.assertAppFinalStateSaved(); verifyApplicationFinished(RMAppState.KILLED); verifyAppRemovedSchedulerEvent(RMAppState.KILLED); verifyRMAppFieldsForFinalTransitions(application); } @Test - public void testAppAcceptedAttemptKilled() throws IOException, - InterruptedException { + public void testAppAcceptedAttemptKilled() throws IOException { LOG.info("--- START: testAppAcceptedAttemptKilled ---"); - RMApp application = testCreateAppAccepted(null); + RMApp application = appCreationTestHelper.testCreateAppAccepted(null); // ACCEPTED => FINAL_SAVING event RMAppEventType.ATTEMPT_KILLED // When application recovery happens for attempt is KILLED but app is @@ -916,7 +700,7 @@ public void testAppAcceptedAttemptKilled() throws IOException, assertAppState(RMAppState.FINAL_SAVING, application); sendAppUpdateSavedEvent(application); assertKilled(application); - assertAppFinalStateSaved(application); + appCreationTestHelper.assertAppFinalStateSaved(); verifyApplicationFinished(RMAppState.KILLED); verifyAppRemovedSchedulerEvent(RMAppState.KILLED); } @@ -925,7 +709,7 @@ public void testAppAcceptedAttemptKilled() throws IOException, public void testAppRunningKill() throws IOException { LOG.info("--- START: testAppRunningKill ---"); - RMApp application = testCreateAppRunning(null); + RMApp application = appCreationTestHelper.testCreateAppRunning(null); // RUNNING => KILLED event RMAppEventType.KILL UserGroupInformation fooUser = UserGroupInformation.createUserForTesting( "fooTestAppRunningKill", new String[] {"foo_group"}); @@ -950,14 +734,14 @@ public void testAppRunningKill() throws IOException { public void testAppRunningFailed() throws IOException { LOG.info("--- START: testAppRunningFailed ---"); - RMApp application = testCreateAppRunning(null); + RMApp application = appCreationTestHelper.testCreateAppRunning(null); RMAppAttempt appAttempt = application.getCurrentAppAttempt(); int expectedAttemptId = 1; Assert.assertEquals(expectedAttemptId, appAttempt.getAppAttemptId().getAttemptId()); // RUNNING => FAILED/RESTARTING event RMAppEventType.ATTEMPT_FAILED - Assert.assertTrue(maxAppAttempts > 1); - for (int i=1; i 1); + for (int i = 1; i < appCreationTestHelper.getMaxAppAttempts(); i++) { RMAppEvent event = new RMAppFailedAttemptEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED, "", false); @@ -990,7 +774,7 @@ public void testAppRunningFailed() throws IOException { rmDispatcher.await(); sendAppUpdateSavedEvent(application); assertFailed(application, ".*Failing the application.*"); - assertAppFinalStateSaved(application); + appCreationTestHelper.assertAppFinalStateSaved(); // FAILED => FAILED event RMAppEventType.KILL event = @@ -999,7 +783,7 @@ public void testAppRunningFailed() throws IOException { application.handle(event); rmDispatcher.await(); assertFailed(application, ".*Failing the application.*"); - assertAppFinalStateSaved(application); + appCreationTestHelper.assertAppFinalStateSaved(); verifyApplicationFinished(RMAppState.FAILED); verifyRMAppFieldsForFinalTransitions(application); } @@ -1008,7 +792,7 @@ public void testAppRunningFailed() throws IOException { public void testAppAtFinishingIgnoreKill() throws IOException { LOG.info("--- START: testAppAtFinishingIgnoreKill ---"); - RMApp application = testCreateAppFinishing(null); + RMApp application = appCreationTestHelper.testCreateAppFinishing(null); // FINISHING => FINISHED event RMAppEventType.KILL RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL, @@ -1026,7 +810,7 @@ public void testAppAtFinishingIgnoreKill() throws IOException { public void testAppFinalSavingToFinished() throws IOException { LOG.info("--- START: testAppFinalSavingToFinished ---"); - RMApp application = testCreateAppFinalSaving(null); + RMApp application = appCreationTestHelper.testCreateAppFinalSaving(null); final String diagMsg = "some diagnostics"; // attempt_finished event comes before attempt_saved event RMAppEvent event = new RMAppEvent(application.getApplicationId(), @@ -1034,7 +818,8 @@ public void testAppFinalSavingToFinished() throws IOException { application.handle(event); assertAppState(RMAppState.FINAL_SAVING, application); RMAppEvent appUpdated = - new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_UPDATE_SAVED); + new RMAppEvent(application.getApplicationId(), + RMAppEventType.APP_UPDATE_SAVED); application.handle(appUpdated); assertAppState(RMAppState.FINISHED, application); @@ -1050,7 +835,7 @@ public void testAppFinalSavingToFinished() throws IOException { public void testAppFinishedFinished() throws IOException { LOG.info("--- START: testAppFinishedFinished ---"); - RMApp application = testCreateAppFinished(null, ""); + RMApp application = appCreationTestHelper.testCreateAppFinished(null, ""); // FINISHED => FINISHED event RMAppEventType.KILL RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL, @@ -1070,7 +855,7 @@ public void testAppFinishedFinished() throws IOException { public void testAppFailedFailed() throws IOException { LOG.info("--- START: testAppFailedFailed ---"); - RMApp application = testCreateAppNewSaving(null); + RMApp application = appCreationTestHelper.testCreateAppNewSaving(null); // NEW_SAVING => FAILED event RMAppEventType.APP_REJECTED RMAppEvent event = new RMAppEvent(application.getApplicationId(), @@ -1100,7 +885,7 @@ public void testAppFailedFailed() throws IOException { public void testAppKilledKilled() throws IOException { LOG.info("--- START: testAppKilledKilled ---"); - RMApp application = testCreateAppRunning(null); + RMApp application = appCreationTestHelper.testCreateAppRunning(null); // RUNNING => KILLED event RMAppEventType.KILL UserGroupInformation fooUser = UserGroupInformation.createUserForTesting( @@ -1152,18 +937,19 @@ public void testAppKilledKilled() throws IOException { } @Test (timeout = 30000) - public void testAppStartAfterKilled() throws IOException { + public void testAppStartAfterKilled() { LOG.info("--- START: testAppStartAfterKilled ---"); - ApplicationId applicationId = MockApps.newAppID(appId++); + ApplicationId applicationId = + MockApps.newAppID(appCreationTestHelper.increaseAppId()); RMApp application = new RMAppImpl(applicationId, rmContext, conf, null, null, null, new ApplicationSubmissionContextPBImpl(), null, null, System.currentTimeMillis(), "YARN", null, null) { @Override protected void onInvalidStateTransition(RMAppEventType rmAppEventType, RMAppState state) { - Assert.assertTrue("RMAppImpl: can't handle " + rmAppEventType - + " at state " + state, false); + Assert.fail("RMAppImpl: can't handle " + rmAppEventType + + " at state " + state); } }; @@ -1199,9 +985,8 @@ public void testAppsRecoveringStates() throws Exception { } } - public void testRecoverApplication(ApplicationStateData appState, - RMState rmState) - throws Exception { + private void testRecoverApplication(ApplicationStateData appState, + RMState rmState) { ApplicationSubmissionContext submissionContext = appState.getApplicationSubmissionContext(); RMAppImpl application = @@ -1209,7 +994,8 @@ public void testRecoverApplication(ApplicationStateData appState, appState.getApplicationSubmissionContext().getApplicationId(), rmContext, conf, submissionContext.getApplicationName(), null, - submissionContext.getQueue(), submissionContext, scheduler, null, + submissionContext.getQueue(), submissionContext, + appCreationTestHelper.getScheduler(), null, appState.getSubmitTime(), submissionContext.getApplicationType(), submissionContext.getApplicationTags(), Collections.singletonList(BuilderUtils.newResourceRequest( @@ -1232,12 +1018,12 @@ public void testRecoverApplication(ApplicationStateData appState, verifyRMAppFieldsForFinalTransitions(application); } - public void createRMStateForApplications( - Map applicationState, - RMAppState rmAppState) throws IOException { - RMApp app = createNewTestApp(null); - ApplicationStateData appState = - ApplicationStateData.newInstance(app.getSubmitTime(), app.getStartTime(), + private void createRMStateForApplications( + Map applicationState, + RMAppState rmAppState) throws IOException { + RMApp app = appCreationTestHelper.createNewTestApp(null); + ApplicationStateData appState = ApplicationStateData.newInstance( + app.getSubmitTime(), app.getStartTime(), app.getUser(), app.getApplicationSubmissionContext(), rmAppState, null, app.getLaunchTime(), app.getFinishTime(), null); applicationState.put(app.getApplicationId(), appState); @@ -1245,11 +1031,12 @@ public void createRMStateForApplications( @Test public void testGetAppReport() throws IOException { - RMApp app = createNewTestApp(null); + RMApp app = appCreationTestHelper.createNewTestApp(null); assertAppState(RMAppState.NEW, app); ApplicationReport report = app.createAndGetApplicationReport(null, true); Assert.assertNotNull(report.getApplicationResourceUsageReport()); - Assert.assertEquals(report.getApplicationResourceUsageReport(),RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT); + Assert.assertEquals(report.getApplicationResourceUsageReport(), + RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT); report = app.createAndGetApplicationReport("clientuser", true); Assert.assertNotNull(report.getApplicationResourceUsageReport()); Assert.assertTrue("bad proxy url for app", @@ -1257,6 +1044,59 @@ public void testGetAppReport() throws IOException { + "/")); } + @Test + public void testFinalTransition() throws IOException { + RMApp application = appCreationTestHelper.testCreateAppFinishing(null); + verifyAppBeforeFinishEvent(application); + + RMAppEvent event = new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_FINISHED); + application.handle(event); + rmDispatcher.await(); + + verifyAppAfterFinishEvent(application); + } + + private void verifyAppBeforeFinishEvent(RMApp app) { + assertEquals(0L, ((RMAppImpl) app).getLogAggregationStartTime()); + //RMAppEventType.APP_UPDATE_SAVED sets the finish time + assertTrue(app.getFinishTime() > testCaseStartTime); + + assertTrue("App manager events should not be received!", + appManagerDispatcher.events.isEmpty()); + } + + private void verifyAppAfterFinishEvent(RMApp app) { + assertTrue( + testCaseStartTime < ((RMAppImpl) app).getLogAggregationStartTime()); + assertTrue(testCaseStartTime < app.getFinishTime()); + verifyAppCompletedEvent(app); + verifyAppRemovedEvent(app); + } + + private void verifyAppCompletedEvent(RMApp app) { + assertEquals(1, appManagerDispatcher.events.size()); + RMAppManagerEvent rmAppManagerEvent = appManagerDispatcher.events.get(0); + assertEquals(RMAppManagerEventType.APP_COMPLETED, + rmAppManagerEvent.getType()); + assertEquals(app.getApplicationId().getId(), + rmAppManagerEvent.getApplicationId().getId()); + } + + private void verifyAppRemovedEvent(RMApp app) { + SchedulerEvent lastSchedulerEvent = schedulerDispatcher.lastSchedulerEvent; + if (!(lastSchedulerEvent instanceof AppRemovedSchedulerEvent)) { + fail("First captured event " + lastSchedulerEvent + + " should be an instance of " + "AppRemovedSchedulerEvent"); + } + AppRemovedSchedulerEvent event = + (AppRemovedSchedulerEvent) lastSchedulerEvent; + assertEquals(SchedulerEventType.APP_REMOVED, event.getType()); + assertEquals(app.getApplicationId().getId(), + event.getApplicationID().getId()); + assertEquals(RMAppState.FINISHED, event.getFinalState()); + } + private void verifyApplicationFinished(RMAppState state) { ArgumentCaptor finalState = ArgumentCaptor.forClass(RMAppState.class); @@ -1279,21 +1119,24 @@ private void verifyAppRemovedSchedulerEvent(RMAppState finalState) { } } - private void verifyRMAppFieldsForNonFinalTransitions(RMApp application) - throws IOException { + private void verifyRMAppFieldsForNonFinalTransitions(RMApp application) { assertEquals(Arrays.asList("/bin/sleep 5"), application.getApplicationSubmissionContext(). getAMContainerSpec().getCommands()); - assertEquals(getLocalResources(), - application.getApplicationSubmissionContext(). - getAMContainerSpec().getLocalResources()); - if(UserGroupInformation.isSecurityEnabled()) { - assertEquals(getTokens(), + try { + assertEquals(getLocalResources(), application.getApplicationSubmissionContext(). - getAMContainerSpec().getTokens()); - assertEquals(getTokensConf(), - application.getApplicationSubmissionContext(). - getAMContainerSpec().getTokensConf()); + getAMContainerSpec().getLocalResources()); + if(UserGroupInformation.isSecurityEnabled()) { + assertEquals(getTokens(), + application.getApplicationSubmissionContext(). + getAMContainerSpec().getTokens()); + assertEquals(getTokensConf(), + application.getApplicationSubmissionContext(). + getAMContainerSpec().getTokensConf()); + } + } catch (IOException e) { + throw new RuntimeException(e); } assertEquals(getEnvironment(), application.getApplicationSubmissionContext().