diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java index 71aa28b..2b2a5cc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java @@ -26,10 +26,20 @@ import org.apache.hadoop.yarn.util.Records; /** - *

The response sent by the ResourceManager to the client - * aborting a submitted application.

- * - *

Currently it's empty.

+ *

+ * The response sent by the ResourceManager to the client aborting + * a submitted application. + *

+ *

+ * The response, includes: + *

+ * Note: user is recommended to wait until this flag becomes true, otherwise if + * the ResourceManager crashes before the application is + * successfully killed, the ResourceManager may retry this + * application on recovery. + *

* * @see ApplicationClientProtocol#forceKillApplication(KillApplicationRequest) */ @@ -38,9 +48,24 @@ public abstract class KillApplicationResponse { @Private @Unstable - public static KillApplicationResponse newInstance() { + public static KillApplicationResponse newInstance(boolean isKilled) { KillApplicationResponse response = Records.newRecord(KillApplicationResponse.class); + response.setIsKilled(isKilled); return response; } + + /** + * Get the flag which indicates that the application is successfully killed. + */ + @Public + @Stable + public abstract boolean getIsKilled(); + + /** + * Set the flag which indicates that the application is successfully killed. + */ + @Private + @Unstable + public abstract void setIsKilled(boolean isKilled); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index cfe71d4..7321f78 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -113,6 +113,7 @@ message KillApplicationRequestProto { } message KillApplicationResponseProto { + optional bool isKilled = 1 [default = false]; } message GetClusterMetricsRequestProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index d35e1a4..93704e2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -179,11 +180,26 @@ public YarnClientApplication createApplication() @Override public void killApplication(ApplicationId applicationId) throws YarnException, IOException { - LOG.info("Killing application " + applicationId); KillApplicationRequest request = Records.newRecord(KillApplicationRequest.class); request.setApplicationId(applicationId); - rmClient.forceKillApplication(request); + + try { + while (true) { + KillApplicationResponse response = + rmClient.forceKillApplication(request); + if (response.getIsKilled()) { + break; + } + LOG.info("Watiting for application " + applicationId + " to be killed."); + Thread.sleep(100); + } + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for application " + applicationId + + " to be killed."); + return; + } + LOG.info("Killed application " + applicationId); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java index 14e0c1f..6ffcd3e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java @@ -22,7 +22,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProtoOrBuilder; import com.google.protobuf.TextFormat; @@ -67,4 +69,24 @@ public boolean equals(Object other) { public String toString() { return TextFormat.shortDebugString(getProto()); } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = KillApplicationResponseProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public boolean getIsKilled() { + KillApplicationResponseProtoOrBuilder p = + viaProto ? proto : builder; + return p.getIsKilled(); + } + + @Override + public void setIsKilled(boolean isKilled) { + maybeInitBuilder(); + builder.setIsKilled(isKilled); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index c3410a9..464091e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -87,8 +87,9 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -377,14 +378,17 @@ public KillApplicationResponse forceKillApplication( + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId)); } - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, RMAppEventType.KILL)); + if (application.getState().equals(RMAppState.KILLED)) { + return KillApplicationResponse.newInstance(true); + } - RMAuditLogger.logSuccess(callerUGI.getShortUserName(), - AuditConstants.KILL_APP_REQUEST, "ClientRMService" , applicationId); - KillApplicationResponse response = recordFactory - .newRecordInstance(KillApplicationResponse.class); - return response; + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppAttemptEvent(application.getCurrentAppAttempt() + .getAppAttemptId(), RMAppAttemptEventType.KILL)); + + RMAuditLogger.logSuccess(callerUGI.getShortUserName(), + AuditConstants.KILL_APP_REQUEST, "ClientRMService", applicationId); + return KillApplicationResponse.newInstance(false); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java index a2fa0e2..0d7351e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java @@ -22,7 +22,6 @@ // Source: ClientRMService START, RECOVER, - KILL, // Source: RMAppAttempt APP_REJECTED, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 76d59ec..f183a91 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -135,9 +135,10 @@ RMAppState.RUNNING, RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED, RMAppState.FINAL_SAVING), RMAppEventType.RECOVER, new RMAppRecoveredTransition()) - .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, RMAppEventType.KILL, + .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, + RMAppEventType.ATTEMPT_KILLED, new FinalSavingTransition( - new AppKilledTransition(), RMAppState.KILLED)) + new AttemptKilledTransition(), RMAppState.KILLED)) .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, RMAppEventType.APP_REJECTED, new FinalSavingTransition( @@ -149,9 +150,9 @@ .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED, RMAppEventType.APP_NEW_SAVED, new StartAppAttemptTransition()) .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING, - RMAppEventType.KILL, + RMAppEventType.ATTEMPT_KILLED, new FinalSavingTransition( - new AppKilledTransition(), RMAppState.KILLED)) + new AttemptKilledTransition(), RMAppState.KILLED)) .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING, RMAppEventType.APP_REJECTED, new FinalSavingTransition(new AppRejectedTransition(), @@ -167,9 +168,9 @@ .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED, RMAppEventType.APP_ACCEPTED) .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING, - RMAppEventType.KILL, + RMAppEventType.ATTEMPT_KILLED, new FinalSavingTransition( - new KillAppAndAttemptTransition(), RMAppState.KILLED)) + new AttemptKilledTransition(), RMAppState.KILLED)) // Transitions from ACCEPTED state .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, @@ -181,9 +182,9 @@ RMAppEventType.ATTEMPT_FAILED, new AttemptFailedTransition(RMAppState.SUBMITTED)) .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING, - RMAppEventType.KILL, + RMAppEventType.ATTEMPT_KILLED, new FinalSavingTransition( - new KillAppAndAttemptTransition(), RMAppState.KILLED)) + new AttemptKilledTransition(), RMAppState.KILLED)) // Transitions from RUNNING state .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, @@ -201,9 +202,9 @@ RMAppEventType.ATTEMPT_FAILED, new AttemptFailedTransition(RMAppState.SUBMITTED)) .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING, - RMAppEventType.KILL, + RMAppEventType.ATTEMPT_KILLED, new FinalSavingTransition( - new KillAppAndAttemptTransition(), RMAppState.KILLED)) + new AttemptKilledTransition(), RMAppState.KILLED)) // Transitions from FINAL_SAVING state .addTransition(RMAppState.FINAL_SAVING, @@ -215,14 +216,14 @@ new AttemptFinishedAtFinalSavingTransition()) // ignorable transitions .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, - EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL, + EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.ATTEMPT_KILLED, RMAppEventType.APP_NEW_SAVED)) // Transitions from FINISHING state .addTransition(RMAppState.FINISHING, RMAppState.FINISHED, RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION) .addTransition(RMAppState.FINISHING, RMAppState.FINISHED, - RMAppEventType.KILL, new KillAppAndAttemptTransition()) + RMAppEventType.ATTEMPT_KILLED, new AttemptKilledTransition()) // ignorable transitions .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, EnumSet.of(RMAppEventType.NODE_UPDATE)) @@ -234,12 +235,12 @@ RMAppEventType.NODE_UPDATE, RMAppEventType.ATTEMPT_UNREGISTERED, RMAppEventType.ATTEMPT_FINISHED, - RMAppEventType.KILL)) + RMAppEventType.ATTEMPT_KILLED)) // Transitions from FAILED state // ignorable transitions .addTransition(RMAppState.FAILED, RMAppState.FAILED, - EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE)) + EnumSet.of(RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE)) // Transitions from KILLED state // ignorable transitions @@ -247,7 +248,7 @@ RMAppState.KILLED, RMAppState.KILLED, EnumSet.of(RMAppEventType.APP_ACCEPTED, - RMAppEventType.APP_REJECTED, RMAppEventType.KILL, + RMAppEventType.APP_REJECTED, RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED, RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE)) @@ -809,7 +810,7 @@ private void rememberTargetTransitionsAndStoreState(RMAppEvent event, RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; diags = getAppAttemptFailedDiagnostics(failedEvent); break; - case KILL: + case ATTEMPT_KILLED: diags = getAppKilledDiagnostics(); break; default: @@ -896,7 +897,7 @@ public void transition(RMAppImpl app, RMAppEvent event) { } - private static class AppKilledTransition extends FinalTransition { + private static class AttemptKilledTransition extends FinalTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { app.diagnostics.append("Application killed by user."); @@ -908,15 +909,6 @@ private static String getAppKilledDiagnostics() { return "Application killed by user."; } - private static class KillAppAndAttemptTransition extends AppKilledTransition { - @SuppressWarnings("unchecked") - @Override - public void transition(RMAppImpl app, RMAppEvent event) { - app.handler.handle(new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(), - RMAppAttemptEventType.KILL)); - super.transition(app, event); - } - } private static final class AppRejectedTransition extends FinalTransition{ public void transition(RMAppImpl app, RMAppEvent event) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 78ca7d1..9d8f416 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -277,12 +278,12 @@ public void NMwaitForState(NodeId nodeid, NodeState finalState) node.getState()); } - public void killApp(ApplicationId appId) throws Exception { + public KillApplicationResponse killApp(ApplicationId appId) throws Exception { ApplicationClientProtocol client = getClientRMService(); - KillApplicationRequest req = Records - .newRecord(KillApplicationRequest.class); + KillApplicationRequest req = + Records.newRecord(KillApplicationRequest.class); req.setApplicationId(appId); - client.forceKillApplication(req); + return client.forceKillApplication(req); } // from AMLauncher diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index 8fe8de4..63c36de 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -68,7 +68,7 @@ */ public class TestAppManager{ - private static RMAppEventType appEventType = RMAppEventType.KILL; + private static RMAppEventType appEventType = RMAppEventType.ATTEMPT_KILLED; public synchronized RMAppEventType getAppEventType() { return appEventType; @@ -216,7 +216,7 @@ public void setUp() { @After public void tearDown() { - setAppEventType(RMAppEventType.KILL); + setAppEventType(RMAppEventType.ATTEMPT_KILLED); ((Service)rmContext.getDispatcher()).stop(); } @@ -370,7 +370,8 @@ protected void setupDispatcher(RMContext rmContext, Configuration conf) { rmContext.getDispatcher().register(RMAppManagerEventType.class, testAppManagerDispatcher); ((Service)rmContext.getDispatcher()).init(conf); ((Service)rmContext.getDispatcher()).start(); - Assert.assertEquals("app event type is wrong before", RMAppEventType.KILL, appEventType); + Assert.assertEquals("app event type is wrong before", + RMAppEventType.ATTEMPT_KILLED, appEventType); } @Test @@ -383,7 +384,7 @@ public void testRMAppSubmit() throws Exception { // wait for event to be processed int timeoutSecs = 0; - while ((getAppEventType() == RMAppEventType.KILL) && + while ((getAppEventType() == RMAppEventType.ATTEMPT_KILLED) && timeoutSecs++ < 20) { Thread.sleep(1000); } @@ -423,11 +424,11 @@ public void testRMAppSubmitMaxAppAttempts() throws Exception { // wait for event to be processed int timeoutSecs = 0; - while ((getAppEventType() == RMAppEventType.KILL) && + while ((getAppEventType() == RMAppEventType.ATTEMPT_KILLED) && timeoutSecs++ < 20) { Thread.sleep(1000); } - setAppEventType(RMAppEventType.KILL); + setAppEventType(RMAppEventType.ATTEMPT_KILLED); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 9e14566..afbbc29 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -34,7 +34,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -55,6 +54,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -68,9 +68,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -80,6 +77,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @@ -93,7 +92,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.mortbay.log.Log; public class TestRMRestart { @@ -917,8 +915,8 @@ public void testRMRestartOnMaxAppAttempts() throws Exception { Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), attemptState.getMasterContainer().getId()); - // Setting AMLivelinessMonitor interval to be 10 Secs. - conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000); + // Setting AMLivelinessMonitor interval to be 3 Secs. + conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 3000); // start new RM MockRM rm2 = new MockRM(conf, memStore); rm2.start(); @@ -1376,6 +1374,69 @@ protected void handleStoreEvent(RMStateStoreEvent event) { Assert.assertTrue(rmAppState.size() == NUM_APPS); } + // This is to test Killing application should be able to wait until app + // reaches killed state.and also check that attempt state is saved before app + // state is saved. + @Test + public void testKillApplicationOnRMRestart() throws Exception { + MemoryRMStateStore memStore = new TestKillAppMemoryRMStateStore(); + memStore.init(conf); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + RMApp app1 = + rm1.submitApp(200, "name", "user", null, false, "default", 1, null, + "myType"); + MockAM am1 = launchAM(app1, rm1, nm1); + + KillApplicationResponse response; + int count = 0; + while (true) { + response = rm1.killApp(app1.getApplicationId()); + if (response.getIsKilled()) { + break; + } + count++; + } + Assert.assertTrue(count > 0); + + rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.KILLED); + rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED); + Assert.assertEquals(1, + ((TestKillAppMemoryRMStateStore) memStore).updateAttempt); + Assert + .assertEquals(2, ((TestKillAppMemoryRMStateStore) memStore).updateApp); + } + + public class TestKillAppMemoryRMStateStore extends MemoryRMStateStore { + int count = 0; + public int updateApp = 0; + public int updateAttempt = 0; + + @Override + public void updateApplicationStateInternal(String appId, + ApplicationStateDataPBImpl appStateData) throws Exception { + updateApp = ++count; + super.updateApplicationStateInternal(appId, appStateData); + } + + @Override + public synchronized void + updateApplicationAttemptStateInternal(String attemptIdStr, + ApplicationAttemptStateDataPBImpl attemptStateData) + throws Exception { + updateAttempt = ++count; + super.updateApplicationAttemptStateInternal(attemptIdStr, + attemptStateData); + } + } + + public static class TestSecurityMockRM extends MockRM { public TestSecurityMockRM(Configuration conf, RMStateStore store) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index e3e0e69..08bbc09 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -486,9 +486,10 @@ public void testAppNewKill() throws IOException { LOG.info("--- START: testAppNewKill ---"); RMApp application = createNewTestApp(null); - // NEW => KILLED event RMAppEventType.KILL + // NEW => KILLED event RMAppEventType.ATTEMPT_KILLED RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_KILLED); application.handle(event); rmDispatcher.await(); sendAppUpdateSavedEvent(application); @@ -515,9 +516,10 @@ public void testAppNewSavingKill() throws IOException { LOG.info("--- START: testAppNewSavingKill ---"); RMApp application = testCreateAppNewSaving(null); - // NEW_SAVING => KILLED event RMAppEventType.KILL + // NEW_SAVING => KILLED event RMAppEventType.ATTEMPT_KILLED RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_KILLED); application.handle(event); rmDispatcher.await(); sendAppUpdateSavedEvent(application); @@ -558,9 +560,9 @@ public void testAppSubmittedRejected() throws IOException { public void testAppSubmittedKill() throws IOException, InterruptedException { LOG.info("--- START: testAppSubmittedKill---"); RMApp application = testCreateAppSubmittedNoRecovery(null); - // SUBMITTED => KILLED event RMAppEventType.KILL + // SUBMITTED => KILLED event RMAppEventType.ATTEMPT_KILLED RMAppEvent event = new RMAppEvent(application.getApplicationId(), - RMAppEventType.KILL); + RMAppEventType.ATTEMPT_KILLED); application.handle(event); rmDispatcher.await(); assertAppAndAttemptKilled(application); @@ -603,9 +605,9 @@ public void testAppAcceptedFailed() throws IOException { public void testAppAcceptedKill() throws IOException, InterruptedException { LOG.info("--- START: testAppAcceptedKill ---"); RMApp application = testCreateAppAccepted(null); - // ACCEPTED => KILLED event RMAppEventType.KILL + // ACCEPTED => KILLED event RMAppEventType.ATTEMPT_KILLED RMAppEvent event = new RMAppEvent(application.getApplicationId(), - RMAppEventType.KILL); + RMAppEventType.ATTEMPT_KILLED); application.handle(event); rmDispatcher.await(); assertAppAndAttemptKilled(application); @@ -616,9 +618,10 @@ public void testAppRunningKill() throws IOException { LOG.info("--- START: testAppRunningKill ---"); RMApp application = testCreateAppRunning(null); - // RUNNING => KILLED event RMAppEventType.KILL + // RUNNING => KILLED event RMAppEventType.ATTEMPT_KILLED RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_KILLED); application.handle(event); rmDispatcher.await(); @@ -677,8 +680,9 @@ public void testAppRunningFailed() throws IOException { sendAppUpdateSavedEvent(application); assertFailed(application, ".*Failing the application.*"); - // FAILED => FAILED event RMAppEventType.KILL - event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + // FAILED => FAILED event RMAppEventType.ATTEMPT_KILLED + event = new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_KILLED); application.handle(event); rmDispatcher.await(); assertFailed(application, ".*Failing the application.*"); @@ -689,9 +693,9 @@ public void testAppFinishingKill() throws IOException { LOG.info("--- START: testAppFinishedFinished ---"); RMApp application = testCreateAppFinishing(null); - // FINISHING => FINISHED event RMAppEventType.KILL - RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + // FINISHING => FINISHED event RMAppEventType.ATTEMPT_KILLED + RMAppEvent event = new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_KILLED); application.handle(event); rmDispatcher.await(); assertAppState(RMAppState.FINISHED, application); @@ -729,9 +733,9 @@ public void testAppFinishedFinished() throws IOException { LOG.info("--- START: testAppFinishedFinished ---"); RMApp application = testCreateAppFinished(null, ""); - // FINISHED => FINISHED event RMAppEventType.KILL - RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + // FINISHED => FINISHED event RMAppEventType.ATTEMPT_KILLED + RMAppEvent event = new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_KILLED); application.handle(event); rmDispatcher.await(); assertTimesAtFinish(application); @@ -756,9 +760,9 @@ public void testAppFailedFailed() throws IOException { assertTimesAtFinish(application); assertAppState(RMAppState.FAILED, application); - // FAILED => FAILED event RMAppEventType.KILL - event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + // FAILED => FAILED event RMAppEventType.ATTEMPT_KILLED + event = new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_KILLED); application.handle(event); rmDispatcher.await(); assertTimesAtFinish(application); @@ -774,9 +778,9 @@ public void testAppKilledKilled() throws IOException { RMApp application = testCreateAppRunning(null); - // RUNNING => KILLED event RMAppEventType.KILL - RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + // RUNNING => KILLED event RMAppEventType.ATTEMPT_KILLED + RMAppEvent event = new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_KILLED); application.handle(event); rmDispatcher.await(); sendAppUpdateSavedEvent(application); @@ -809,8 +813,9 @@ public void testAppKilledKilled() throws IOException { assertTimesAtFinish(application); assertAppState(RMAppState.KILLED, application); - // KILLED => KILLED event RMAppEventType.KILL - event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + // KILLED => KILLED event RMAppEventType.ATTEMPT_KILLED + event = new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_KILLED); application.handle(event); rmDispatcher.await(); assertTimesAtFinish(application);