diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java index 354fcc4..ed764b6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java @@ -43,9 +43,12 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData; import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData; import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData; +import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; /** @@ -213,14 +216,15 @@ public void applicationStarted(RMApp app) { } @SuppressWarnings("unchecked") - public void applicationFinished(RMApp app) { + public void applicationFinished(RMApp app, + RMAppState finalState) { dispatcher.getEventHandler().handle( new WritingApplicationFinishEvent(app.getApplicationId(), ApplicationFinishData.newInstance(app.getApplicationId(), app.getFinishTime(), app.getDiagnostics().toString(), app.getFinalApplicationStatus(), - app.createApplicationState()))); + RMServerUtils.createApplicationState(finalState)))); } @SuppressWarnings("unchecked") @@ -234,14 +238,15 @@ public void applicationAttemptStarted(RMAppAttempt appAttempt) { } @SuppressWarnings("unchecked") - public void applicationAttemptFinished(RMAppAttempt appAttempt) { + public void applicationAttemptFinished(RMAppAttempt appAttempt, + RMAppAttemptState finalState) { dispatcher.getEventHandler().handle( new WritingApplicationAttemptFinishEvent(appAttempt.getAppAttemptId(), ApplicationAttemptFinishData.newInstance(appAttempt .getAppAttemptId(), appAttempt.getDiagnostics().toString(), appAttempt.getTrackingUrl(), appAttempt - .getFinalApplicationStatus(), appAttempt - .createApplicationAttemptState()))); + .getFinalApplicationStatus(), + RMServerUtils.createApplicationAttemptState(finalState)))); } @SuppressWarnings("unchecked") diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index e710168..8e42c58 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; @@ -107,7 +108,6 @@ private String queue; @SuppressWarnings("rawtypes") private EventHandler handler; - private static final FinalTransition FINAL_TRANSITION = new FinalTransition(); private static final AppFinishedTransition FINISHED_TRANSITION = new AppFinishedTransition(); private RMAppState stateBeforeFinalSaving; @@ -221,7 +221,8 @@ .addTransition(RMAppState.FINISHING, RMAppState.FINISHED, RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION) .addTransition(RMAppState.FINISHING, RMAppState.FINISHED, - RMAppEventType.KILL, new KillAppAndAttemptTransition()) + RMAppEventType.KILL, + new KillAppAndAttemptTransition(RMAppState.FINISHED)) // ignorable transitions .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, EnumSet.of(RMAppEventType.NODE_UPDATE)) @@ -664,7 +665,7 @@ public void transition(RMAppImpl app, RMAppEvent event) { public RMAppState transition(RMAppImpl app, RMAppEvent event) { if (app.recoveredFinalState != null) { - FINAL_TRANSITION.transition(app, event); + new FinalTransition(app.recoveredFinalState).transition(app, event); return app.recoveredFinalState; } // Directly call AttemptFailedTransition, since now we deem that an @@ -737,7 +738,7 @@ public void transition(RMAppImpl app, RMAppEvent event) { LOG.info(msg); app.diagnostics.append(msg); // Inform the node for app-finish - FINAL_TRANSITION.transition(app, event); + new FinalTransition(RMAppState.FAILED).transition(app, event); } } @@ -850,6 +851,11 @@ public void transition(RMAppImpl app, RMAppEvent event) { } private static class AppFinishedTransition extends FinalTransition { + + public AppFinishedTransition() { + super(RMAppState.FINISHED); + } + public void transition(RMAppImpl app, RMAppEvent event) { RMAppFinishedAttemptEvent finishedEvent = (RMAppFinishedAttemptEvent)event; @@ -893,6 +899,15 @@ public void transition(RMAppImpl app, RMAppEvent event) { private static class AppKilledTransition extends FinalTransition { + + public AppKilledTransition() { + super(RMAppState.KILLED); + } + + public AppKilledTransition(RMAppState finalAppState) { + super(finalAppState); + } + @Override public void transition(RMAppImpl app, RMAppEvent event) { app.diagnostics.append("Application killed by user."); @@ -905,6 +920,15 @@ private static String getAppKilledDiagnostics() { } private static class KillAppAndAttemptTransition extends AppKilledTransition { + + public KillAppAndAttemptTransition() { + super(); + } + + public KillAppAndAttemptTransition(RMAppState finalAppState) { + super(finalAppState); + } + @SuppressWarnings("unchecked") @Override public void transition(RMAppImpl app, RMAppEvent event) { @@ -915,6 +939,11 @@ public void transition(RMAppImpl app, RMAppEvent event) { } private static final class AppRejectedTransition extends FinalTransition{ + + public AppRejectedTransition() { + super(RMAppState.FAILED); + } + public void transition(RMAppImpl app, RMAppEvent event) { RMAppRejectedEvent rejectedEvent = (RMAppRejectedEvent)event; app.diagnostics.append(rejectedEvent.getMessage()); @@ -924,6 +953,12 @@ public void transition(RMAppImpl app, RMAppEvent event) { private static class FinalTransition extends RMAppTransition { + private final RMAppState finalAppState; + + public FinalTransition(RMAppState finalAppState) { + this.finalAppState = finalAppState; + } + private Set getNodesOnWhichAttemptRan(RMAppImpl app) { Set nodes = new HashSet(); for (RMAppAttempt attempt : app.attempts.values()) { @@ -950,7 +985,7 @@ public void transition(RMAppImpl app, RMAppEvent event) { // TODO: We need to fix for the problem that RMApp enters the final state // after RMAppAttempt in the killing case app.rmContext.getRMApplicationHistoryWriter() - .applicationFinished(app); + .applicationFinished(app, finalAppState); }; } @@ -1002,26 +1037,6 @@ public YarnApplicationState createApplicationState() { if (rmAppState.equals(RMAppState.FINAL_SAVING)) { rmAppState = stateBeforeFinalSaving; } - switch (rmAppState) { - case NEW: - return YarnApplicationState.NEW; - case NEW_SAVING: - return YarnApplicationState.NEW_SAVING; - case SUBMITTED: - return YarnApplicationState.SUBMITTED; - case ACCEPTED: - return YarnApplicationState.ACCEPTED; - case RUNNING: - return YarnApplicationState.RUNNING; - case FINISHING: - case FINISHED: - return YarnApplicationState.FINISHED; - case KILLED: - return YarnApplicationState.KILLED; - case FAILED: - return YarnApplicationState.FAILED; - default: - throw new YarnRuntimeException("Unknown state passed!"); - } + return RMServerUtils.createApplicationState(rmAppState); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 4a4790e..75cb725 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -787,7 +787,7 @@ public void transition(RMAppAttemptImpl appAttempt, appAttempt.removeCredentials(appAttempt); appAttempt.rmContext.getRMApplicationHistoryWriter() - .applicationAttemptFinished(appAttempt); + .applicationAttemptFinished(appAttempt, RMAppAttemptState.FAILED); } } @@ -1052,7 +1052,7 @@ public void transition(RMAppAttemptImpl appAttempt, appAttempt.removeCredentials(appAttempt); appAttempt.rmContext.getRMApplicationHistoryWriter() - .applicationAttemptFinished(appAttempt); + .applicationAttemptFinished(appAttempt, finalAttemptState); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java index 819bcba..476d10a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java @@ -133,8 +133,8 @@ private static RMApp createRMApp(ApplicationId appId) { new StringBuilder("test diagnostics info")); when(app.getFinalApplicationStatus()).thenReturn( FinalApplicationStatus.UNDEFINED); - when(app.createApplicationState()) - .thenReturn(YarnApplicationState.FINISHED); + when(app.getState()) + .thenReturn(RMAppState.FINISHED); return app; } @@ -152,8 +152,8 @@ private static RMAppAttempt createRMAppAttempt( when(appAttempt.getTrackingUrl()).thenReturn("test url"); when(appAttempt.getFinalApplicationStatus()).thenReturn( FinalApplicationStatus.UNDEFINED); - when(appAttempt.createApplicationAttemptState()).thenReturn( - YarnApplicationAttemptState.FINISHED); + when(appAttempt.getState()).thenReturn( + RMAppAttemptState.FINISHED); return appAttempt; } @@ -197,7 +197,7 @@ public void testWriteApplication() throws Exception { Assert.assertEquals(0L, appHD.getSubmitTime()); Assert.assertEquals(1L, appHD.getStartTime()); - writer.applicationFinished(app); + writer.applicationFinished(app, app.getState()); for (int i = 0; i < MAX_RETRIES; ++i) { appHD = store.getApplication(ApplicationId.newInstance(0, 1)); if (appHD.getYarnApplicationState() != null) { @@ -239,7 +239,7 @@ public void testWriteApplicationAttempt() throws Exception { ApplicationId.newInstance(0, 1), 1), 1), appAttemptHD.getMasterContainerId()); - writer.applicationAttemptFinished(appAttempt); + writer.applicationAttemptFinished(appAttempt, appAttempt.getState()); for (int i = 0; i < MAX_RETRIES; ++i) { appAttemptHD = store.getApplicationAttempt(ApplicationAttemptId.newInstance( @@ -325,9 +325,9 @@ public void testParallelWrite() throws Exception { writer.containerStarted(container); writer.containerFinished(container); } - writer.applicationAttemptFinished(appAttempt); + writer.applicationAttemptFinished(appAttempt, appAttempt.getState()); } - writer.applicationFinished(app); + writer.applicationFinished(app, app.getState()); } for (int i = 0; i < MAX_RETRIES; ++i) { if (allEventsHandled(20 * 10 * 10 + 20 * 10 + 20)) { @@ -367,13 +367,14 @@ protected RMApplicationHistoryWriter createRMApplicationHistoryWriter() { public void applicationStarted(RMApp app) { } @Override - public void applicationFinished(RMApp app) { + public void applicationFinished(RMApp app, RMAppState finalState) { } @Override public void applicationAttemptStarted(RMAppAttempt appAttempt) { } @Override - public void applicationAttemptFinished(RMAppAttempt appAttempt) { + public void applicationAttemptFinished(RMAppAttempt appAttempt, + RMAppAttemptState finalState) { } @Override public void containerStarted(RMContainer container) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 75da09d..1da6bd2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -83,6 +83,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.mockito.ArgumentCaptor; @RunWith(value = Parameterized.class) @@ -514,7 +515,9 @@ public void testAppNewKill() throws IOException { application.handle(event); rmDispatcher.await(); assertKilled(application); - verify(writer).applicationFinished(any(RMApp.class)); + ArgumentCaptor argument = ArgumentCaptor.forClass(RMAppState.class); + verify(writer).applicationFinished(any(RMApp.class), argument.capture()); + Assert.assertEquals(RMAppState.KILLED, argument.getValue()); } @Test @@ -529,7 +532,9 @@ public void testAppNewReject() throws IOException { application.handle(event); rmDispatcher.await(); assertFailed(application, rejectedText); - verify(writer).applicationFinished(any(RMApp.class)); + ArgumentCaptor argument = ArgumentCaptor.forClass(RMAppState.class); + verify(writer).applicationFinished(any(RMApp.class), argument.capture()); + Assert.assertEquals(RMAppState.FAILED, argument.getValue()); } @Test (timeout = 30000) @@ -543,7 +548,9 @@ public void testAppNewSavingKill() throws IOException { application.handle(event); rmDispatcher.await(); assertKilled(application); - verify(writer).applicationFinished(any(RMApp.class)); + ArgumentCaptor argument = ArgumentCaptor.forClass(RMAppState.class); + verify(writer).applicationFinished(any(RMApp.class), argument.capture()); + Assert.assertEquals(RMAppState.KILLED, argument.getValue()); } @Test (timeout = 30000) @@ -558,7 +565,9 @@ public void testAppNewSavingReject() throws IOException { application.handle(event); rmDispatcher.await(); assertFailed(application, rejectedText); - verify(writer).applicationFinished(any(RMApp.class)); + ArgumentCaptor argument = ArgumentCaptor.forClass(RMAppState.class); + verify(writer).applicationFinished(any(RMApp.class), argument.capture()); + Assert.assertEquals(RMAppState.FAILED, argument.getValue()); } @Test (timeout = 30000) @@ -573,7 +582,9 @@ public void testAppSubmittedRejected() throws IOException { application.handle(event); rmDispatcher.await(); assertFailed(application, rejectedText); - verify(writer).applicationFinished(any(RMApp.class)); + ArgumentCaptor argument = ArgumentCaptor.forClass(RMAppState.class); + verify(writer).applicationFinished(any(RMApp.class), argument.capture()); + Assert.assertEquals(RMAppState.FAILED, argument.getValue()); } @Test @@ -586,7 +597,9 @@ public void testAppSubmittedKill() throws IOException, InterruptedException { application.handle(event); rmDispatcher.await(); assertAppAndAttemptKilled(application); - verify(writer).applicationFinished(any(RMApp.class)); + ArgumentCaptor argument = ArgumentCaptor.forClass(RMAppState.class); + verify(writer).applicationFinished(any(RMApp.class), argument.capture()); + Assert.assertEquals(RMAppState.KILLED, argument.getValue()); } @Test @@ -619,7 +632,9 @@ public void testAppAcceptedFailed() throws IOException { application.handle(event); rmDispatcher.await(); assertFailed(application, ".*" + message + ".*Failing the application.*"); - verify(writer).applicationFinished(any(RMApp.class)); + ArgumentCaptor argument = ArgumentCaptor.forClass(RMAppState.class); + verify(writer).applicationFinished(any(RMApp.class), argument.capture()); + Assert.assertEquals(RMAppState.FAILED, argument.getValue()); } @Test @@ -632,7 +647,9 @@ public void testAppAcceptedKill() throws IOException, InterruptedException { application.handle(event); rmDispatcher.await(); assertAppAndAttemptKilled(application); - verify(writer).applicationFinished(any(RMApp.class)); + ArgumentCaptor argument = ArgumentCaptor.forClass(RMAppState.class); + verify(writer).applicationFinished(any(RMApp.class), argument.capture()); + Assert.assertEquals(RMAppState.KILLED, argument.getValue()); } @Test @@ -654,7 +671,9 @@ public void testAppRunningKill() throws IOException { assertAppState(RMAppState.FINAL_SAVING, application); assertKilled(application); - verify(writer).applicationFinished(any(RMApp.class)); + ArgumentCaptor argument = ArgumentCaptor.forClass(RMAppState.class); + verify(writer).applicationFinished(any(RMApp.class), argument.capture()); + Assert.assertEquals(RMAppState.KILLED, argument.getValue()); } @Test @@ -706,7 +725,9 @@ public void testAppRunningFailed() throws IOException { application.handle(event); rmDispatcher.await(); assertFailed(application, ".*Failing the application.*"); - verify(writer).applicationFinished(any(RMApp.class)); + ArgumentCaptor argument = ArgumentCaptor.forClass(RMAppState.class); + verify(writer).applicationFinished(any(RMApp.class), argument.capture()); + Assert.assertEquals(RMAppState.FAILED, argument.getValue()); } @Test @@ -720,7 +741,9 @@ public void testAppFinishingKill() throws IOException { application.handle(event); rmDispatcher.await(); assertAppState(RMAppState.FINISHED, application); - verify(writer).applicationFinished(any(RMApp.class)); + ArgumentCaptor argument = ArgumentCaptor.forClass(RMAppState.class); + verify(writer).applicationFinished(any(RMApp.class), argument.capture()); + Assert.assertEquals(RMAppState.FINISHED, argument.getValue()); } // While App is at FINAL_SAVING, Attempt_Finished event may come before @@ -765,7 +788,9 @@ public void testAppFinishedFinished() throws IOException { StringBuilder diag = application.getDiagnostics(); Assert.assertEquals("application diagnostics is not correct", "", diag.toString()); - verify(writer).applicationFinished(any(RMApp.class)); + ArgumentCaptor argument = ArgumentCaptor.forClass(RMAppState.class); + verify(writer).applicationFinished(any(RMApp.class), argument.capture()); + Assert.assertEquals(RMAppState.FINISHED, argument.getValue()); } @Test (timeout = 30000) @@ -796,7 +821,9 @@ public void testAppFailedFailed() throws IOException { rmDispatcher.await(); assertTimesAtFinish(application); assertAppState(RMAppState.FAILED, application); - verify(writer).applicationFinished(any(RMApp.class)); + ArgumentCaptor argument = ArgumentCaptor.forClass(RMAppState.class); + verify(writer).applicationFinished(any(RMApp.class), argument.capture()); + Assert.assertEquals(RMAppState.FAILED, argument.getValue()); } @Test (timeout = 30000) @@ -852,7 +879,9 @@ public void testAppKilledKilled() throws IOException { rmDispatcher.await(); assertTimesAtFinish(application); assertAppState(RMAppState.KILLED, application); - verify(writer).applicationFinished(any(RMApp.class)); + ArgumentCaptor argument = ArgumentCaptor.forClass(RMAppState.class); + verify(writer).applicationFinished(any(RMApp.class), argument.capture()); + Assert.assertEquals(RMAppState.KILLED, argument.getValue()); } @Test diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index d3092eb..836b248 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -75,9 +75,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; @@ -99,6 +99,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.mockito.ArgumentCaptor; @RunWith(value = Parameterized.class) public class TestRMAppAttemptTransitions { @@ -373,7 +374,11 @@ private void testAppAttemptKilledState(Container amContainer, assertEquals(0, applicationAttempt.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); - verify(writer).applicationAttemptFinished(any(RMAppAttempt.class)); + ArgumentCaptor argument = + ArgumentCaptor.forClass(RMAppAttemptState.class); + verify(writer).applicationAttemptFinished(any(RMAppAttempt.class), + argument.capture()); + assertEquals(RMAppAttemptState.KILLED, argument.getValue()); verifyAttemptFinalStateSaved(); } @@ -451,7 +456,11 @@ private void testAppAttemptFailedState(Container container, // Check events verify(application, times(2)).handle(any(RMAppFailedAttemptEvent.class)); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); - verify(writer).applicationAttemptFinished(any(RMAppAttempt.class)); + ArgumentCaptor argument = + ArgumentCaptor.forClass(RMAppAttemptState.class); + verify(writer).applicationAttemptFinished(any(RMAppAttempt.class), + argument.capture()); + assertEquals(RMAppAttemptState.FAILED, argument.getValue()); verifyAttemptFinalStateSaved(); } @@ -534,7 +543,11 @@ private void testAppAttemptFinishedState(Container container, assertEquals(container, applicationAttempt.getMasterContainer()); assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); - verify(writer).applicationAttemptFinished(any(RMAppAttempt.class)); + ArgumentCaptor argument = + ArgumentCaptor.forClass(RMAppAttemptState.class); + verify(writer).applicationAttemptFinished(any(RMAppAttempt.class), + argument.capture()); + assertEquals(RMAppAttemptState.FINISHED, argument.getValue()); }