diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 9f1ea4403f4..92ec0dc0c63 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -903,7 +903,6 @@ public int getMaxAppAttempts() { @Override public void handle(RMAppEvent event) { - this.writeLock.lock(); try { @@ -1459,8 +1458,7 @@ public void transition(RMAppImpl app, RMAppEvent event) { } } - private static final class AppRejectedTransition extends - FinalTransition{ + private static final class AppRejectedTransition extends FinalTransition { public AppRejectedTransition() { super(RMAppState.FAILED); } @@ -1506,35 +1504,55 @@ public FinalTransition(RMAppState finalState) { this.finalState = finalState; } + @Override public void transition(RMAppImpl app, RMAppEvent event) { - app.logAggregationStartTime = System.currentTimeMillis(); + recordLogAggregationStartTime(app); + cleanAppInRMNodes(app); + recordFinishTime(app); + removeAppFromScheduler(app); + boolean logAggregationNotRequired = !app.logAggregationEnabled || app + .isLogAggregationFinished(); + if (logAggregationNotRequired) { + sendEventToAppManager(app, RMAppManagerEventType.APP_COMPLETED); + } + handleAppFinishedWithRmContext(app); + app.clearUnusedFields(); + appAdminClientCleanUp(app); + } + + private void cleanAppInRMNodes(RMAppImpl app) { for (NodeId nodeId : app.getRanNodes()) { app.handler.handle( - new RMNodeCleanAppEvent(nodeId, app.applicationId)); + new RMNodeCleanAppEvent(nodeId, app.applicationId)); } + } + + private void recordLogAggregationStartTime(RMAppImpl app) { + app.logAggregationStartTime = System.currentTimeMillis(); + } + + private void recordFinishTime(RMAppImpl app) { app.finishTime = app.storedFinishTime; - if (app.finishTime == 0 ) { + if (app.finishTime == 0) { app.finishTime = app.systemClock.getTime(); } + } + + private void removeAppFromScheduler(RMAppImpl app) { // Recovered apps that are completed were not added to scheduler, so no // need to remove them from scheduler. if (app.recoveredFinalState == null) { app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId, - finalState)); + finalState)); } - app.handler.handle( - new RMAppManagerEvent(app.applicationId, - RMAppManagerEventType.APP_COMPLETED)); + } + private void handleAppFinishedWithRmContext(RMAppImpl app) { app.rmContext.getRMApplicationHistoryWriter() - .applicationFinished(app, finalState); + .applicationFinished(app, finalState); app.rmContext.getSystemMetricsPublisher() - .appFinished(app, finalState, app.finishTime); - // set the memory free - app.clearUnusedFields(); - - appAdminClientCleanUp(app); - }; + .appFinished(app, finalState, app.finishTime); + } } public int getNumFailedAppAttempts() { @@ -1549,7 +1567,7 @@ public int getNumFailedAppAttempts() { return completedAttempts; } - private static final class AttemptFailedTransition implements + private static final class AttemptFailedTransition implements MultipleArcTransition { private final RMAppState initialState; @@ -1812,8 +1830,8 @@ public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { == LogAggregationStatus.TIME_OUT && report.getLogAggregationStatus() == LogAggregationStatus.RUNNING) { - // If the log aggregation status got from latest nm heartbeat - // is Running, and current log aggregation status is TimeOut, + // If the log aggregation status got from latest NM heartbeat + // is RUNNING, and current log aggregation status is TIME_OUT, // based on whether there are any failure messages for this NM, // we will reset the log aggregation status as RUNNING or // RUNNING_WITH_FAILURE @@ -1830,6 +1848,7 @@ public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { updateLogAggregationDiagnosticMessages(nodeId, report); if (isAppInFinalState(this) && stateChangedToFinal) { updateLogAggregationStatus(nodeId); + sendEventToAppManager(this, RMAppManagerEventType.APP_COMPLETED); } } } finally { @@ -2131,4 +2150,10 @@ protected void onInvalidStateTransition(RMAppEventType rmAppEventType, RMAppState state){ /* TODO fail the application on the failed transition */ } + + private static void sendEventToAppManager(RMAppImpl app, + RMAppManagerEventType event) { + app.handler.handle( + new RMAppManagerEvent(app.applicationId, event)); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImplTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImplTest.java new file mode 100644 index 00000000000..a915fdeb766 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImplTest.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; +import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; +import org.apache.hadoop.yarn.state.StateMachine; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.List; +import java.util.Set; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +public class RMAppImplTest { + private EventHandler eventHandler; + private RMContext rmContext; + private long testCaseStartTime; + private ApplicationId appId; + private int appIdentifier = 1111; + + private static void setRMAppCurrentState(RMApp rmApp, RMAppState state) { + try { + Field stateMachineField = RMAppImpl.class.getDeclaredField( + "stateMachine"); + stateMachineField.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(stateMachineField, + stateMachineField.getModifiers() & ~Modifier.FINAL); + + StateMachine stateMachine = (StateMachine) stateMachineField.get(rmApp); + + Field currentStateField = stateMachine.getClass().getDeclaredField( + "currentState"); + currentStateField.setAccessible(true); + currentStateField.set(stateMachine, state); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private Object getFieldValue(RMApp rmApp, String fieldName) { + Field f; + try { + f = rmApp.getClass().getDeclaredField(fieldName); + f.setAccessible(true); + return f.get(rmApp); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void setFieldValue(RMApp rmApp, String fieldName, Object value) { + Field f; + try { + f = rmApp.getClass().getDeclaredField(fieldName); + f.setAccessible(true); + f.set(rmApp, value); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + + private void recordTestCaseStartTime() { + testCaseStartTime = System.currentTimeMillis(); + } + + private List captureEventHandlerArguments(int expected) { + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass( + AbstractEvent.class); + verify(eventHandler, times(expected)).handle(argumentCaptor.capture()); + return argumentCaptor.getAllValues(); + } + + private void verifyAppRemovedEvent(AbstractEvent e) { + if (!(e instanceof AppRemovedSchedulerEvent)) { + fail("First captured event " + e + " should be an instance of " + + "AppRemovedSchedulerEvent"); + } + AppRemovedSchedulerEvent event = (AppRemovedSchedulerEvent) e; + assertEquals(appId, event.getApplicationID()); + assertEquals(RMAppState.FINISHED, event.getFinalState()); + } + + private void verifyAppManagerEvent(AbstractEvent e) { + if (!(e instanceof RMAppManagerEvent)) { + fail("First captured event " + e + " should be an instance of " + + "AppRemovedSchedulerEvent"); + } + RMAppManagerEvent event = (RMAppManagerEvent) e; + assertEquals(appId, event.getApplicationId()); + assertEquals(RMAppManagerEventType.APP_COMPLETED, event.getType()); + } + + private Dispatcher createDispatcher() { + eventHandler = mock(EventHandler.class); + + Dispatcher dispatcher = mock(Dispatcher.class); + when(dispatcher.getEventHandler()).thenReturn(eventHandler); + return dispatcher; + } + + private RMContext createRmContext(Dispatcher dispatcher) { + final RMContext rmContext = mock(RMContext.class); + RMApplicationHistoryWriter mockAppHistoryWriter = mock( + RMApplicationHistoryWriter.class); + SystemMetricsPublisher mockSystemMetricsPublisher = mock( + SystemMetricsPublisher.class); + + when(rmContext.getDispatcher()).thenReturn(dispatcher); + when(rmContext.getRMApplicationHistoryWriter()).thenReturn( + mockAppHistoryWriter); + when(rmContext.getSystemMetricsPublisher()).thenReturn( + mockSystemMetricsPublisher); + return rmContext; + } + + private Configuration createConfigWithLogAggregationDisabled() { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false); + return conf; + } + + private Configuration createConfigWithLogAggregationEnabled() { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + return conf; + } + + private RMAppImpl createRmAppWithState(Configuration conf, RMAppState state) { + long clusterTimestamp = 1234L; + appId = ApplicationId.newInstance(clusterTimestamp, appIdentifier++); + String rmAppName = "app1"; + String user = "testUser"; + String queue = "testQueue"; + ApplicationSubmissionContext mockAppSubmissionContext = mock( + ApplicationSubmissionContext.class); + YarnScheduler scheduler = null; + ApplicationMasterService appMasterService = null; + long submitTime = 12345L; + String appType = "testApplication"; + Set appTags = Sets.newHashSet(); + List amReqs = Lists.newArrayList(); + RMAppImpl rmApp = new RMAppImpl(appId, this.rmContext, conf, rmAppName, + user, queue, mockAppSubmissionContext, scheduler, + appMasterService, submitTime, appType, appTags, amReqs); + setRMAppCurrentState(rmApp, state); + + return rmApp; + } + + private void aggregateLogReportWithAggregationStatus(RMAppImpl rmApp, + LogAggregationStatus status) { + rmApp.aggregateLogReport(NodeId.newInstance("host", 1111), + LogAggregationReport.newInstance(appId, status, "diagMessage")); + } + + @Before + public void setUp() { + recordTestCaseStartTime(); + } + + @Test + public void testFinalTransitionIfLogAggregationDisabled() { + Dispatcher dispatcher = createDispatcher(); + rmContext = createRmContext(dispatcher); + + Configuration conf = createConfigWithLogAggregationDisabled(); + RMAppImpl rmApp = createRmAppWithState(conf, RMAppState.FINISHING); + + verifyAppBeforeFinishEvent(rmApp); + + RMAppEvent event = new RMAppEvent(appId, RMAppEventType.ATTEMPT_FINISHED); + rmApp.handle(event); + + verifyAppAfterFinishEvent(rmApp, 2); + } + + @Test + public void testFinalTransitionIfLogAggregationEnabledAndFinished() { + Dispatcher dispatcher = createDispatcher(); + rmContext = createRmContext(dispatcher); + + Configuration conf = createConfigWithLogAggregationEnabled(); + RMAppImpl rmApp = createRmAppWithState(conf, RMAppState.FINISHING); + setFieldValue(rmApp, "currentAttempt", mock(RMAppAttempt.class)); + + verifyAppBeforeFinishEvent(rmApp); + + RMAppEvent event = new RMAppEvent(appId, RMAppEventType.ATTEMPT_FINISHED); + rmApp.handle(event); + aggregateLogReportWithAggregationStatus(rmApp, + LogAggregationStatus.SUCCEEDED); + + verifyAppAfterFinishEvent(rmApp, 2); + } + + @Test + public void testFinalTransitionIfLogAggregationEnabledAndNotFinished() { + eventHandler = mock(EventHandler.class); + Dispatcher dispatcher = createDispatcher(); + rmContext = createRmContext(dispatcher); + + Configuration conf = createConfigWithLogAggregationEnabled(); + RMAppImpl rmApp = createRmAppWithState(conf, RMAppState.FINISHING); + + verifyAppBeforeFinishEvent(rmApp); + + RMAppEvent event = new RMAppEvent(appId, RMAppEventType.ATTEMPT_FINISHED); + rmApp.handle(event); + + //app should be in the same state as before ATTEMPT_FINISHED event was + // handled + verifyAppAfterFinishEvent(rmApp, 1); + } + + @Test + public void testLogAggregationDoesNotSendAppCompletedEventIfStateIsNotFinal() { + Dispatcher dispatcher = createDispatcher(); + rmContext = createRmContext(dispatcher); + + verifyLogAggregationDoesNotSendAppCompletedEvent(RMAppState.NEW); + verifyLogAggregationDoesNotSendAppCompletedEvent(RMAppState.NEW_SAVING); + verifyLogAggregationDoesNotSendAppCompletedEvent(RMAppState.SUBMITTED); + verifyLogAggregationDoesNotSendAppCompletedEvent(RMAppState.ACCEPTED); + verifyLogAggregationDoesNotSendAppCompletedEvent(RMAppState.RUNNING); + verifyLogAggregationDoesNotSendAppCompletedEvent(RMAppState.FINAL_SAVING); + verifyLogAggregationDoesNotSendAppCompletedEvent(RMAppState.FINISHING); + verifyLogAggregationDoesNotSendAppCompletedEvent(RMAppState.KILLING); + } + + @Test + public void testLogAggregationSendsAppCompletedEventIfStateIsFinal() { + Dispatcher dispatcher = createDispatcher(); + rmContext = createRmContext(dispatcher); + + verifyLogAggregationSendsAppCompletedEvent(RMAppState.FINISHED); + reset(eventHandler); + + verifyLogAggregationSendsAppCompletedEvent(RMAppState.KILLED); + reset(eventHandler); + + verifyLogAggregationSendsAppCompletedEvent(RMAppState.FAILED); + } + + private void verifyLogAggregationDoesNotSendAppCompletedEvent(RMAppState + rmAppState) { + Configuration conf = createConfigWithLogAggregationEnabled(); + RMAppImpl rmApp = createRmAppWithState(conf, rmAppState); + verifyAppBeforeFinishEvent(rmApp); + aggregateLogReportWithAggregationStatus(rmApp, + LogAggregationStatus.SUCCEEDED); + captureEventHandlerArguments(0); + } + + private void verifyLogAggregationSendsAppCompletedEvent(RMAppState + rmAppState) { + Configuration conf = createConfigWithLogAggregationEnabled(); + RMAppImpl rmApp = createRmAppWithState(conf, rmAppState); + + verifyAppBeforeFinishEvent(rmApp); + aggregateLogReportWithAggregationStatus(rmApp, + LogAggregationStatus.SUCCEEDED); + + List capturedEvents = captureEventHandlerArguments(1); + assertEquals(1, capturedEvents.size()); + verifyAppManagerEvent(capturedEvents.get(0)); + } + + private void verifyAppBeforeFinishEvent(RMApp rmApp) { + assertEquals(0L, getFieldValue(rmApp, "logAggregationStartTime")); + assertEquals(0L, getFieldValue(rmApp, "finishTime")); + + verifyZeroInteractions(eventHandler); + verify(rmContext, never()).getRMApplicationHistoryWriter(); + verify(rmContext, never()).getSystemMetricsPublisher(); + } + + private void verifyAppAfterFinishEvent(RMAppImpl rmApp, int expectedEvents) { + assertTrue(testCaseStartTime < (long) getFieldValue(rmApp, + "logAggregationStartTime")); + assertTrue(testCaseStartTime < (long) getFieldValue(rmApp, "finishTime")); + + List capturedEvents = captureEventHandlerArguments( + expectedEvents); + assertEquals(expectedEvents, capturedEvents.size()); + if (expectedEvents == 1) { + verifyAppRemovedEvent(capturedEvents.get(0)); + } else if (expectedEvents > 1) { + verifyAppManagerEvent(capturedEvents.get(1)); + } + } + +} \ No newline at end of file