diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java index 71aa28b..2b2a5cc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java @@ -26,10 +26,20 @@ import org.apache.hadoop.yarn.util.Records; /** - *

The response sent by the ResourceManager to the client - * aborting a submitted application.

- * - *

Currently it's empty.

+ *

+ * The response sent by the ResourceManager to the client aborting + * a submitted application. + *

+ *

+ * The response, includes: + *

+ * Note: user is recommended to wait until this flag becomes true, otherwise if + * the ResourceManager crashes before the application is + * successfully killed, the ResourceManager may retry this + * application on recovery. + *

* * @see ApplicationClientProtocol#forceKillApplication(KillApplicationRequest) */ @@ -38,9 +48,24 @@ public abstract class KillApplicationResponse { @Private @Unstable - public static KillApplicationResponse newInstance() { + public static KillApplicationResponse newInstance(boolean isKilled) { KillApplicationResponse response = Records.newRecord(KillApplicationResponse.class); + response.setIsKilled(isKilled); return response; } + + /** + * Get the flag which indicates that the application is successfully killed. + */ + @Public + @Stable + public abstract boolean getIsKilled(); + + /** + * Set the flag which indicates that the application is successfully killed. + */ + @Private + @Unstable + public abstract void setIsKilled(boolean isKilled); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index cfe71d4..7321f78 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -113,6 +113,7 @@ message KillApplicationRequestProto { } message KillApplicationResponseProto { + optional bool isKilled = 1 [default = false]; } message GetClusterMetricsRequestProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index d35e1a4..2ad1f2f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -179,11 +180,30 @@ public YarnClientApplication createApplication() @Override public void killApplication(ApplicationId applicationId) throws YarnException, IOException { - LOG.info("Killing application " + applicationId); KillApplicationRequest request = Records.newRecord(KillApplicationRequest.class); request.setApplicationId(applicationId); - rmClient.forceKillApplication(request); + + try { + int pollCount = 0; + while (true) { + KillApplicationResponse response = + rmClient.forceKillApplication(request); + if (response.getIsKilled()) { + break; + } + if (++pollCount % 10 == 0) { + LOG.info("Watiting for application " + applicationId + + " to be killed."); + } + Thread.sleep(100); + } + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for application " + applicationId + + " to be killed."); + return; + } + LOG.info("Killed application " + applicationId); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java index 14e0c1f..6ffcd3e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java @@ -22,7 +22,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProtoOrBuilder; import com.google.protobuf.TextFormat; @@ -67,4 +69,24 @@ public boolean equals(Object other) { public String toString() { return TextFormat.shortDebugString(getProto()); } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = KillApplicationResponseProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public boolean getIsKilled() { + KillApplicationResponseProtoOrBuilder p = + viaProto ? proto : builder; + return p.getIsKilled(); + } + + @Override + public void setIsKilled(boolean isKilled) { + maybeInitBuilder(); + builder.setIsKilled(isKilled); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index f070f28..705b212 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -298,7 +298,7 @@ public FinishApplicationMasterResponse finishApplicationMaster( .getDiagnostics())); if (rmContext.getRMApps().get(applicationAttemptId.getApplicationId()) - .isAppSafeToUnregister()) { + .isAppSafeToTerminate()) { return FinishApplicationMasterResponse.newInstance(true); } else { return FinishApplicationMasterResponse.newInstance(false); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index c3410a9..89e1778 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -87,8 +87,8 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -377,14 +377,17 @@ public KillApplicationResponse forceKillApplication( + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId)); } - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, RMAppEventType.KILL)); + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppAttemptEvent(application.getCurrentAppAttempt() + .getAppAttemptId(), RMAppAttemptEventType.KILL)); - RMAuditLogger.logSuccess(callerUGI.getShortUserName(), - AuditConstants.KILL_APP_REQUEST, "ClientRMService" , applicationId); - KillApplicationResponse response = recordFactory - .newRecordInstance(KillApplicationResponse.class); - return response; + if (application.isAppSafeToTerminate()) { + RMAuditLogger.logSuccess(callerUGI.getShortUserName(), + AuditConstants.KILL_APP_REQUEST, "ClientRMService", applicationId); + return KillApplicationResponse.newInstance(true); + } else { + return KillApplicationResponse.newInstance(false); + } } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index fadaa3b..1809a4b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -197,13 +197,13 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, String getApplicationType(); /** - * Check whether this application is safe to unregister. - * An application is deemed to be safe to unregister if it is an unmanaged - * AM or its state has been removed from state store. + * Check whether this application is safe to terminate. + * An application is deemed to be safe to terminate if it is an unmanaged + * AM or its state has been saved in state store. * @return the flag which indicates whether this application is safe to - * unregister. + * terminate. */ - boolean isAppSafeToUnregister(); + boolean isAppSafeToTerminate(); /** * Create the external user-facing state of ApplicationMaster from the diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java index a2fa0e2..0d7351e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java @@ -22,7 +22,6 @@ // Source: ClientRMService START, RECOVER, - KILL, // Source: RMAppAttempt APP_REJECTED, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 76d59ec..cc1ac9f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -135,9 +135,10 @@ RMAppState.RUNNING, RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED, RMAppState.FINAL_SAVING), RMAppEventType.RECOVER, new RMAppRecoveredTransition()) - .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, RMAppEventType.KILL, + .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, + RMAppEventType.ATTEMPT_KILLED, new FinalSavingTransition( - new AppKilledTransition(), RMAppState.KILLED)) + new AttemptKilledTransition(), RMAppState.KILLED)) .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, RMAppEventType.APP_REJECTED, new FinalSavingTransition( @@ -149,9 +150,9 @@ .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED, RMAppEventType.APP_NEW_SAVED, new StartAppAttemptTransition()) .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING, - RMAppEventType.KILL, + RMAppEventType.ATTEMPT_KILLED, new FinalSavingTransition( - new AppKilledTransition(), RMAppState.KILLED)) + new AttemptKilledTransition(), RMAppState.KILLED)) .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING, RMAppEventType.APP_REJECTED, new FinalSavingTransition(new AppRejectedTransition(), @@ -167,9 +168,9 @@ .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED, RMAppEventType.APP_ACCEPTED) .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING, - RMAppEventType.KILL, + RMAppEventType.ATTEMPT_KILLED, new FinalSavingTransition( - new KillAppAndAttemptTransition(), RMAppState.KILLED)) + new AttemptKilledTransition(), RMAppState.KILLED)) // Transitions from ACCEPTED state .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, @@ -181,9 +182,9 @@ RMAppEventType.ATTEMPT_FAILED, new AttemptFailedTransition(RMAppState.SUBMITTED)) .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING, - RMAppEventType.KILL, + RMAppEventType.ATTEMPT_KILLED, new FinalSavingTransition( - new KillAppAndAttemptTransition(), RMAppState.KILLED)) + new AttemptKilledTransition(), RMAppState.KILLED)) // Transitions from RUNNING state .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, @@ -201,9 +202,9 @@ RMAppEventType.ATTEMPT_FAILED, new AttemptFailedTransition(RMAppState.SUBMITTED)) .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING, - RMAppEventType.KILL, + RMAppEventType.ATTEMPT_KILLED, new FinalSavingTransition( - new KillAppAndAttemptTransition(), RMAppState.KILLED)) + new AttemptKilledTransition(), RMAppState.KILLED)) // Transitions from FINAL_SAVING state .addTransition(RMAppState.FINAL_SAVING, @@ -215,17 +216,15 @@ new AttemptFinishedAtFinalSavingTransition()) // ignorable transitions .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, - EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL, + EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.ATTEMPT_KILLED, RMAppEventType.APP_NEW_SAVED)) // Transitions from FINISHING state .addTransition(RMAppState.FINISHING, RMAppState.FINISHED, RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION) - .addTransition(RMAppState.FINISHING, RMAppState.FINISHED, - RMAppEventType.KILL, new KillAppAndAttemptTransition()) // ignorable transitions .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, - EnumSet.of(RMAppEventType.NODE_UPDATE)) + EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.ATTEMPT_KILLED)) // Transitions from FINISHED state // ignorable transitions @@ -234,12 +233,12 @@ RMAppEventType.NODE_UPDATE, RMAppEventType.ATTEMPT_UNREGISTERED, RMAppEventType.ATTEMPT_FINISHED, - RMAppEventType.KILL)) + RMAppEventType.ATTEMPT_KILLED)) // Transitions from FAILED state // ignorable transitions .addTransition(RMAppState.FAILED, RMAppState.FAILED, - EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE)) + EnumSet.of(RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE)) // Transitions from KILLED state // ignorable transitions @@ -247,7 +246,7 @@ RMAppState.KILLED, RMAppState.KILLED, EnumSet.of(RMAppEventType.APP_ACCEPTED, - RMAppEventType.APP_REJECTED, RMAppEventType.KILL, + RMAppEventType.APP_REJECTED, RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED, RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE)) @@ -809,7 +808,7 @@ private void rememberTargetTransitionsAndStoreState(RMAppEvent event, RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; diags = getAppAttemptFailedDiagnostics(failedEvent); break; - case KILL: + case ATTEMPT_KILLED: diags = getAppKilledDiagnostics(); break; default: @@ -896,7 +895,7 @@ public void transition(RMAppImpl app, RMAppEvent event) { } - private static class AppKilledTransition extends FinalTransition { + private static class AttemptKilledTransition extends FinalTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { app.diagnostics.append("Application killed by user."); @@ -908,15 +907,6 @@ private static String getAppKilledDiagnostics() { return "Application killed by user."; } - private static class KillAppAndAttemptTransition extends AppKilledTransition { - @SuppressWarnings("unchecked") - @Override - public void transition(RMAppImpl app, RMAppEvent event) { - app.handler.handle(new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(), - RMAppAttemptEventType.KILL)); - super.transition(app, event); - } - } private static final class AppRejectedTransition extends FinalTransition{ public void transition(RMAppImpl app, RMAppEvent event) { @@ -984,9 +974,12 @@ public String getApplicationType() { } @Override - public boolean isAppSafeToUnregister() { + public boolean isAppSafeToTerminate() { RMAppState state = getState(); - return state.equals(RMAppState.FINISHING) + boolean isRecoveryEnabled = + conf.getBoolean(YarnConfiguration.RECOVERY_ENABLED, + YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED); + return !isRecoveryEnabled || state.equals(RMAppState.FINISHING) || state.equals(RMAppState.FINISHED) || state.equals(RMAppState.FAILED) || state.equals(RMAppState.KILLED) || // If this is an unmanaged AM, we are safe to unregister since unmanaged diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 9944c9c..6945378 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -277,12 +278,10 @@ public void NMwaitForState(NodeId nodeid, NodeState finalState) node.getState()); } - public void killApp(ApplicationId appId) throws Exception { + public KillApplicationResponse killApp(ApplicationId appId) throws Exception { ApplicationClientProtocol client = getClientRMService(); - KillApplicationRequest req = Records - .newRecord(KillApplicationRequest.class); - req.setApplicationId(appId); - client.forceKillApplication(req); + KillApplicationRequest req = KillApplicationRequest.newInstance(appId); + return client.forceKillApplication(req); } // from AMLauncher diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index 8fe8de4..63c36de 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -68,7 +68,7 @@ */ public class TestAppManager{ - private static RMAppEventType appEventType = RMAppEventType.KILL; + private static RMAppEventType appEventType = RMAppEventType.ATTEMPT_KILLED; public synchronized RMAppEventType getAppEventType() { return appEventType; @@ -216,7 +216,7 @@ public void setUp() { @After public void tearDown() { - setAppEventType(RMAppEventType.KILL); + setAppEventType(RMAppEventType.ATTEMPT_KILLED); ((Service)rmContext.getDispatcher()).stop(); } @@ -370,7 +370,8 @@ protected void setupDispatcher(RMContext rmContext, Configuration conf) { rmContext.getDispatcher().register(RMAppManagerEventType.class, testAppManagerDispatcher); ((Service)rmContext.getDispatcher()).init(conf); ((Service)rmContext.getDispatcher()).start(); - Assert.assertEquals("app event type is wrong before", RMAppEventType.KILL, appEventType); + Assert.assertEquals("app event type is wrong before", + RMAppEventType.ATTEMPT_KILLED, appEventType); } @Test @@ -383,7 +384,7 @@ public void testRMAppSubmit() throws Exception { // wait for event to be processed int timeoutSecs = 0; - while ((getAppEventType() == RMAppEventType.KILL) && + while ((getAppEventType() == RMAppEventType.ATTEMPT_KILLED) && timeoutSecs++ < 20) { Thread.sleep(1000); } @@ -423,11 +424,11 @@ public void testRMAppSubmitMaxAppAttempts() throws Exception { // wait for event to be processed int timeoutSecs = 0; - while ((getAppEventType() == RMAppEventType.KILL) && + while ((getAppEventType() == RMAppEventType.ATTEMPT_KILLED) && timeoutSecs++ < 20) { Thread.sleep(1000); } - setAppEventType(RMAppEventType.KILL); + setAppEventType(RMAppEventType.ATTEMPT_KILLED); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 38f8542..4d4a1cc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -34,7 +34,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -55,6 +54,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -68,9 +68,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -80,6 +77,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @@ -93,7 +92,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.mortbay.log.Log; public class TestRMRestart { @@ -917,8 +915,8 @@ public void testRMRestartOnMaxAppAttempts() throws Exception { Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), attemptState.getMasterContainer().getId()); - // Setting AMLivelinessMonitor interval to be 10 Secs. - conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000); + // Setting AMLivelinessMonitor interval to be 3 Secs. + conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 3000); // start new RM MockRM rm2 = new MockRM(conf, memStore); rm2.start(); @@ -1378,6 +1376,69 @@ protected void handleStoreEvent(RMStateStoreEvent event) { Assert.assertTrue(rmAppState.size() == NUM_APPS); } + // This is to test Killing application should be able to wait until app + // reaches killed state and also check that attempt state is saved before app + // state is saved. + @Test + public void testKillApplicationOnRMRestart() throws Exception { + MemoryRMStateStore memStore = new TestKillAppMemoryRMStateStore(); + memStore.init(conf); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + RMApp app1 = + rm1.submitApp(200, "name", "user", null, false, "default", 1, null, + "myType"); + MockAM am1 = launchAM(app1, rm1, nm1); + + KillApplicationResponse response; + int count = 0; + while (true) { + response = rm1.killApp(app1.getApplicationId()); + if (response.getIsKilled()) { + break; + } + count++; + } + Assert.assertTrue(count > 0); + + rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.KILLED); + rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED); + Assert.assertEquals(1, + ((TestKillAppMemoryRMStateStore) memStore).updateAttempt); + Assert + .assertEquals(2, ((TestKillAppMemoryRMStateStore) memStore).updateApp); + } + + public class TestKillAppMemoryRMStateStore extends MemoryRMStateStore { + int count = 0; + public int updateApp = 0; + public int updateAttempt = 0; + + @Override + public void updateApplicationStateInternal(String appId, + ApplicationStateDataPBImpl appStateData) throws Exception { + updateApp = ++count; + super.updateApplicationStateInternal(appId, appStateData); + } + + @Override + public synchronized void + updateApplicationAttemptStateInternal(String attemptIdStr, + ApplicationAttemptStateDataPBImpl attemptStateData) + throws Exception { + updateAttempt = ++count; + super.updateApplicationAttemptStateInternal(attemptIdStr, + attemptStateData); + } + } + + public static class TestSecurityMockRM extends MockRM { public TestSecurityMockRM(Configuration conf, RMStateStore store) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index b90c711..aa116bf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -145,7 +145,7 @@ public void setQueue(String name) { } @Override - public boolean isAppSafeToUnregister() { + public boolean isAppSafeToTerminate() { throw new UnsupportedOperationException("Not supported yet."); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index bcb2f6f..debcffe 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -218,7 +218,7 @@ public String getApplicationType() { } @Override - public boolean isAppSafeToUnregister() { + public boolean isAppSafeToTerminate() { return true; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 5b68723..22351cc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -230,6 +230,7 @@ protected RMApp createNewTestApp(ApplicationSubmissionContext submissionContext) testAppStartState(applicationId, user, name, queue, application); this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(), application); + rmDispatcher.await(); return application; } @@ -297,18 +298,6 @@ private void assertKilled(RMApp application) { StringBuilder diag = application.getDiagnostics(); Assert.assertEquals("application diagnostics is not correct", "Application killed by user.", diag.toString()); - } - - private void assertAppAndAttemptKilled(RMApp application) - throws InterruptedException { - sendAppUpdateSavedEvent(application); - assertKilled(application); - // send attempt final state saved event. - application.getCurrentAppAttempt().handle( - new RMAppAttemptUpdateSavedEvent(application.getCurrentAppAttempt() - .getAppAttemptId(), null)); - Assert.assertEquals(RMAppAttemptState.KILLED, application - .getCurrentAppAttempt().getAppAttemptState()); assertAppFinalStateSaved(application); } @@ -329,6 +318,18 @@ private void sendAppUpdateSavedEvent(RMApp application) { rmDispatcher.await(); } + private void killAppAndAttempt(RMApp application) { + // Kill the attempt which leads the the application to be killed. + application.getCurrentAppAttempt().handle( + new RMAppAttemptEvent(application.getCurrentAppAttempt() + .getAppAttemptId(), RMAppAttemptEventType.KILL)); + application.getCurrentAppAttempt().handle( + new RMAppAttemptUpdateSavedEvent(application.getCurrentAppAttempt() + .getAppAttemptId(), null)); + Assert.assertEquals(RMAppAttemptState.KILLED, application + .getCurrentAppAttempt().getAppAttemptState()); + } + protected RMApp testCreateAppNewSaving( ApplicationSubmissionContext submissionContext) throws IOException { RMApp application = createNewTestApp(submissionContext); @@ -487,9 +488,10 @@ public void testAppNewKill() throws IOException { LOG.info("--- START: testAppNewKill ---"); RMApp application = createNewTestApp(null); - // NEW => KILLED event RMAppEventType.KILL + // NEW => KILLED event RMAppEventType.ATTEMPT_KILLED RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_KILLED); application.handle(event); rmDispatcher.await(); sendAppUpdateSavedEvent(application); @@ -516,9 +518,10 @@ public void testAppNewSavingKill() throws IOException { LOG.info("--- START: testAppNewSavingKill ---"); RMApp application = testCreateAppNewSaving(null); - // NEW_SAVING => KILLED event RMAppEventType.KILL + // NEW_SAVING => KILLED event RMAppEventType.ATTEMPT_KILLED RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_KILLED); application.handle(event); rmDispatcher.await(); sendAppUpdateSavedEvent(application); @@ -559,12 +562,9 @@ public void testAppSubmittedRejected() throws IOException { public void testAppSubmittedKill() throws IOException, InterruptedException { LOG.info("--- START: testAppSubmittedKill---"); RMApp application = testCreateAppSubmittedNoRecovery(null); - // SUBMITTED => KILLED event RMAppEventType.KILL - RMAppEvent event = new RMAppEvent(application.getApplicationId(), - RMAppEventType.KILL); - application.handle(event); - rmDispatcher.await(); - assertAppAndAttemptKilled(application); + killAppAndAttempt(application); + sendAppUpdateSavedEvent(application); + assertKilled(application); } @Test @@ -604,12 +604,10 @@ public void testAppAcceptedFailed() throws IOException { public void testAppAcceptedKill() throws IOException, InterruptedException { LOG.info("--- START: testAppAcceptedKill ---"); RMApp application = testCreateAppAccepted(null); - // ACCEPTED => KILLED event RMAppEventType.KILL - RMAppEvent event = new RMAppEvent(application.getApplicationId(), - RMAppEventType.KILL); - application.handle(event); - rmDispatcher.await(); - assertAppAndAttemptKilled(application); + // ACCEPTED => KILLED event RMAppEventType.ATTEMPT_KILLED + killAppAndAttempt(application); + sendAppUpdateSavedEvent(application); + assertKilled(application); } @Test @@ -617,11 +615,8 @@ public void testAppRunningKill() throws IOException { LOG.info("--- START: testAppRunningKill ---"); RMApp application = testCreateAppRunning(null); - // RUNNING => KILLED event RMAppEventType.KILL - RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); - application.handle(event); - rmDispatcher.await(); + // RUNNING => KILLED event RMAppEventType.ATTEMPT_KILLED + killAppAndAttempt(application); // Ignore Attempt_Finished if we were supposed to go to Finished. assertAppState(RMAppState.FINAL_SAVING, application); @@ -678,26 +673,14 @@ public void testAppRunningFailed() throws IOException { sendAppUpdateSavedEvent(application); assertFailed(application, ".*Failing the application.*"); - // FAILED => FAILED event RMAppEventType.KILL - event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + // FAILED => FAILED event RMAppEventType.ATTEMPT_KILLED + event = new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_KILLED); application.handle(event); rmDispatcher.await(); assertFailed(application, ".*Failing the application.*"); } - @Test - public void testAppFinishingKill() throws IOException { - LOG.info("--- START: testAppFinishedFinished ---"); - - RMApp application = testCreateAppFinishing(null); - // FINISHING => FINISHED event RMAppEventType.KILL - RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); - application.handle(event); - rmDispatcher.await(); - assertAppState(RMAppState.FINISHED, application); - } - // While App is at FINAL_SAVING, Attempt_Finished event may come before // App_Saved event, we stay on FINAL_SAVING on Attempt_Finished event // and then directly jump from FINAL_SAVING to FINISHED state on App_Saved @@ -730,9 +713,9 @@ public void testAppFinishedFinished() throws IOException { LOG.info("--- START: testAppFinishedFinished ---"); RMApp application = testCreateAppFinished(null, ""); - // FINISHED => FINISHED event RMAppEventType.KILL - RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + // FINISHED => FINISHED event RMAppEventType.ATTEMPT_KILLED + RMAppEvent event = new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_KILLED); application.handle(event); rmDispatcher.await(); assertTimesAtFinish(application); @@ -757,9 +740,9 @@ public void testAppFailedFailed() throws IOException { assertTimesAtFinish(application); assertAppState(RMAppState.FAILED, application); - // FAILED => FAILED event RMAppEventType.KILL - event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + // FAILED => FAILED event RMAppEventType.ATTEMPT_KILLED + event = new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_KILLED); application.handle(event); rmDispatcher.await(); assertTimesAtFinish(application); @@ -775,9 +758,9 @@ public void testAppKilledKilled() throws IOException { RMApp application = testCreateAppRunning(null); - // RUNNING => KILLED event RMAppEventType.KILL - RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + // RUNNING => KILLED event RMAppEventType.ATTEMPT_KILLED + RMAppEvent event = new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_KILLED); application.handle(event); rmDispatcher.await(); sendAppUpdateSavedEvent(application); @@ -810,8 +793,9 @@ public void testAppKilledKilled() throws IOException { assertTimesAtFinish(application); assertAppState(RMAppState.KILLED, application); - // KILLED => KILLED event RMAppEventType.KILL - event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + // KILLED => KILLED event RMAppEventType.ATTEMPT_KILLED + event = new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_KILLED); application.handle(event); rmDispatcher.await(); assertTimesAtFinish(application);