diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java index 71aa28b..77bb71d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java @@ -26,10 +26,21 @@ import org.apache.hadoop.yarn.util.Records; /** - *

The response sent by the ResourceManager to the client - * aborting a submitted application.

- * - *

Currently it's empty.

+ *

+ * The response sent by the ResourceManager to the client aborting + * a submitted application. + *

+ *

+ * The response, includes: + *

+ * Note: user is recommended to wait until this flag becomes true, otherwise if + * the ResourceManager crashes before the process of killing the + * application is completed, the ResourceManager may retry this + * application on recovery. + *

* * @see ApplicationClientProtocol#forceKillApplication(KillApplicationRequest) */ @@ -38,9 +49,24 @@ public abstract class KillApplicationResponse { @Private @Unstable - public static KillApplicationResponse newInstance() { + public static KillApplicationResponse newInstance(boolean isKillCompleted) { KillApplicationResponse response = Records.newRecord(KillApplicationResponse.class); + response.setIsKillCompleted(isKillCompleted); return response; } + + /** + * Get the flag which indicates that the process of killing application is completed or not. + */ + @Public + @Stable + public abstract boolean getIsKillCompleted(); + + /** + * Set the flag which indicates that the process of killing application is completed or not. + */ + @Private + @Unstable + public abstract void setIsKillCompleted(boolean isKillCompleted); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 839765c..ef6e01b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -26,7 +26,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; @@ -879,15 +878,23 @@ //////////////////////////////// /** + * Use YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS instead. * The interval of the yarn client's querying application state after * application submission. The unit is millisecond. */ + @Deprecated public static final String YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS = YARN_PREFIX + "client.app-submission.poll-interval"; - public static final long DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS = - 1000; /** + * The interval that the yarn client library uses to poll the completion + * status of the asynchronous API of application client protocol. + */ + public static final String YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS = + YARN_PREFIX + "client.application-client-protocol.poll-interval-ms"; + public static final long DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS = + 200; + /** * Max number of threads in NMClientAsync to process container management * events */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 332be81..a4631d1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -116,6 +116,7 @@ message KillApplicationRequestProto { } message KillApplicationResponseProto { + optional bool is_kill_completed = 1 [default = false]; } message GetClusterMetricsRequestProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index d35e1a4..7c44604 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -79,7 +80,8 @@ protected ApplicationClientProtocol rmClient; protected InetSocketAddress rmAddress; - protected long statePollIntervalMillis; + protected long submitPollIntervalMillis; + private long asyncApiPollIntervalMillis; private static final String ROOT = "root"; @@ -92,12 +94,20 @@ private static InetSocketAddress getRmAddress(Configuration conf) { YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT); } + @SuppressWarnings("deprecation") @Override protected void serviceInit(Configuration conf) throws Exception { this.rmAddress = getRmAddress(conf); - statePollIntervalMillis = conf.getLong( + asyncApiPollIntervalMillis = + conf.getLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS, + YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); + submitPollIntervalMillis = asyncApiPollIntervalMillis; + if (conf.get(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS) + != null) { + submitPollIntervalMillis = conf.getLong( YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS, - YarnConfiguration.DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS); + YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); + } super.serviceInit(conf); } @@ -165,7 +175,7 @@ public YarnClientApplication createApplication() " is still in " + state); } try { - Thread.sleep(statePollIntervalMillis); + Thread.sleep(submitPollIntervalMillis); } catch (InterruptedException ie) { } } @@ -179,11 +189,29 @@ public YarnClientApplication createApplication() @Override public void killApplication(ApplicationId applicationId) throws YarnException, IOException { - LOG.info("Killing application " + applicationId); KillApplicationRequest request = Records.newRecord(KillApplicationRequest.class); request.setApplicationId(applicationId); - rmClient.forceKillApplication(request); + + try { + int pollCount = 0; + while (true) { + KillApplicationResponse response = + rmClient.forceKillApplication(request); + if (response.getIsKillCompleted()) { + break; + } + if (++pollCount % 10 == 0) { + LOG.info("Watiting for application " + applicationId + + " to be killed."); + } + Thread.sleep(asyncApiPollIntervalMillis); + } + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for application " + applicationId + + " to be killed."); + } + LOG.info("Killed application " + applicationId); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index 826433d..72b5b28c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -89,6 +89,7 @@ public void testClientStop() { rm.stop(); } + @SuppressWarnings("deprecation") @Test (timeout = 30000) public void testSubmitApplication() { Configuration conf = new Configuration(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java index 14e0c1f..139c2a5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java @@ -22,7 +22,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProtoOrBuilder; import com.google.protobuf.TextFormat; @@ -67,4 +69,24 @@ public boolean equals(Object other) { public String toString() { return TextFormat.shortDebugString(getProto()); } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = KillApplicationResponseProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public boolean getIsKillCompleted() { + KillApplicationResponseProtoOrBuilder p = + viaProto ? proto : builder; + return p.getIsKillCompleted(); + } + + @Override + public void setIsKillCompleted(boolean isKillCompleted) { + maybeInitBuilder(); + builder.setIsKillCompleted(isKillCompleted); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index c43dc1a..9673826 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -945,10 +945,10 @@ - The interval of the yarn client's querying application state - after application submission. The unit is millisecond. - yarn.client.app-submission.poll-interval - 1000 + The interval that the yarn client library uses to poll the + completion status of the asynchronous API of application client protocol. + + yarn.client.application-client-protocol.poll-interval-ms + 200 - diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index f070f28..16091c6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -292,15 +292,14 @@ public FinishApplicationMasterResponse finishApplicationMaster( this.amLivelinessMonitor.receivedPing(applicationAttemptId); - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptUnregistrationEvent(applicationAttemptId, request - .getTrackingUrl(), request.getFinalApplicationStatus(), request - .getDiagnostics())); - if (rmContext.getRMApps().get(applicationAttemptId.getApplicationId()) - .isAppSafeToUnregister()) { + .isAppSafeToTerminate()) { return FinishApplicationMasterResponse.newInstance(true); } else { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptUnregistrationEvent(applicationAttemptId, request + .getTrackingUrl(), request.getFinalApplicationStatus(), request + .getDiagnostics())); return FinishApplicationMasterResponse.newInstance(false); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index f0e8553..cd2226f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -380,14 +380,15 @@ public KillApplicationResponse forceKillApplication( + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId)); } - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, RMAppEventType.KILL)); - - RMAuditLogger.logSuccess(callerUGI.getShortUserName(), - AuditConstants.KILL_APP_REQUEST, "ClientRMService" , applicationId); - KillApplicationResponse response = recordFactory - .newRecordInstance(KillApplicationResponse.class); - return response; + if (application.isAppSafeToTerminate()) { + RMAuditLogger.logSuccess(callerUGI.getShortUserName(), + AuditConstants.KILL_APP_REQUEST, "ClientRMService", applicationId); + return KillApplicationResponse.newInstance(true); + } else { + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppEvent(applicationId, RMAppEventType.KILL)); + return KillApplicationResponse.newInstance(false); + } } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index fadaa3b..1809a4b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -197,13 +197,13 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, String getApplicationType(); /** - * Check whether this application is safe to unregister. - * An application is deemed to be safe to unregister if it is an unmanaged - * AM or its state has been removed from state store. + * Check whether this application is safe to terminate. + * An application is deemed to be safe to terminate if it is an unmanaged + * AM or its state has been saved in state store. * @return the flag which indicates whether this application is safe to - * unregister. + * terminate. */ - boolean isAppSafeToUnregister(); + boolean isAppSafeToTerminate(); /** * Create the external user-facing state of ApplicationMaster from the diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java index a2fa0e2..ad3f20d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java @@ -37,5 +37,4 @@ // Source: RMStateStore APP_NEW_SAVED, APP_UPDATE_SAVED, - APP_REMOVED } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 5a70cc2..052deb8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -111,6 +111,7 @@ private static final AppFinishedTransition FINISHED_TRANSITION = new AppFinishedTransition(); private RMAppState stateBeforeFinalSaving; + private RMAppState stateBeforeKilling; private RMAppEvent eventCausingFinalSaving; private RMAppState targetedFinalState; private RMAppState recoveredFinalState; @@ -166,10 +167,8 @@ new AppRejectedTransition(), RMAppState.FAILED)) .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED, RMAppEventType.APP_ACCEPTED) - .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING, - RMAppEventType.KILL, - new FinalSavingTransition( - new KillAppAndAttemptTransition(), RMAppState.KILLED)) + .addTransition(RMAppState.SUBMITTED, RMAppState.KILLING, + RMAppEventType.KILL,new KillAttemptTransition()) // Transitions from ACCEPTED state .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, @@ -180,10 +179,8 @@ EnumSet.of(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING), RMAppEventType.ATTEMPT_FAILED, new AttemptFailedTransition(RMAppState.SUBMITTED)) - .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING, - RMAppEventType.KILL, - new FinalSavingTransition( - new KillAppAndAttemptTransition(), RMAppState.KILLED)) + .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING, + RMAppEventType.KILL,new KillAttemptTransition()) // Transitions from RUNNING state .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, @@ -200,10 +197,8 @@ EnumSet.of(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING), RMAppEventType.ATTEMPT_FAILED, new AttemptFailedTransition(RMAppState.SUBMITTED)) - .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING, - RMAppEventType.KILL, - new FinalSavingTransition( - new KillAppAndAttemptTransition(), RMAppState.KILLED)) + .addTransition(RMAppState.RUNNING, RMAppState.KILLING, + RMAppEventType.KILL, new KillAttemptTransition()) // Transitions from FINAL_SAVING state .addTransition(RMAppState.FINAL_SAVING, @@ -221,11 +216,24 @@ // Transitions from FINISHING state .addTransition(RMAppState.FINISHING, RMAppState.FINISHED, RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION) - .addTransition(RMAppState.FINISHING, RMAppState.FINISHED, - RMAppEventType.KILL, new KillAppAndAttemptTransition()) // ignorable transitions .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, - EnumSet.of(RMAppEventType.NODE_UPDATE)) + EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL)) + + // Transitions from KILLING state + .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING, + RMAppEventType.ATTEMPT_KILLED, + new FinalSavingTransition( + new AppKilledTransition(), RMAppState.KILLED)) + .addTransition(RMAppState.KILLING, RMAppState.KILLING, + EnumSet.of( + RMAppEventType.NODE_UPDATE, + RMAppEventType.ATTEMPT_REGISTERED, + RMAppEventType.ATTEMPT_UNREGISTERED, + RMAppEventType.ATTEMPT_FINISHED, + RMAppEventType.ATTEMPT_FAILED, + RMAppEventType.APP_UPDATE_SAVED, + RMAppEventType.KILL)) // Transitions from FINISHED state // ignorable transitions @@ -249,7 +257,7 @@ EnumSet.of(RMAppEventType.APP_ACCEPTED, RMAppEventType.APP_REJECTED, RMAppEventType.KILL, RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED, - RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE)) + RMAppEventType.NODE_UPDATE)) .installTopology(); @@ -419,6 +427,7 @@ private FinalApplicationStatus createFinalApplicationStatus(RMAppState state) { case ACCEPTED: case RUNNING: case FINAL_SAVING: + case KILLING: return FinalApplicationStatus.UNDEFINED; // finished without a proper final state is the same as failed case FINISHING: @@ -681,7 +690,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { } // No existent attempts means the attempt associated with this app was not - // started or started but not yet saved。 + // started or started but not yet saved. if (app.attempts.isEmpty()) { app.createNewAttempt(true); return RMAppState.SUBMITTED; @@ -812,6 +821,7 @@ private void rememberTargetTransitionsAndStoreState(RMAppEvent event, diags = getAppAttemptFailedDiagnostics(failedEvent); break; case KILL: + case ATTEMPT_KILLED: diags = getAppKilledDiagnostics(); break; default: @@ -901,7 +911,7 @@ public void transition(RMAppImpl app, RMAppEvent event) { private static class AppKilledTransition extends FinalTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { - app.diagnostics.append("Application killed by user."); + app.diagnostics.append(getAppKilledDiagnostics()); super.transition(app, event); }; } @@ -910,15 +920,16 @@ private static String getAppKilledDiagnostics() { return "Application killed by user."; } - private static class KillAppAndAttemptTransition extends AppKilledTransition { + private static class KillAttemptTransition extends RMAppTransition { @SuppressWarnings("unchecked") @Override public void transition(RMAppImpl app, RMAppEvent event) { - app.handler.handle(new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(), - RMAppAttemptEventType.KILL)); - super.transition(app, event); + app.stateBeforeKilling = app.getState(); + app.handler.handle(new RMAppAttemptEvent(app.currentAttempt + .getAppAttemptId(), RMAppAttemptEventType.KILL)); } } + private static final class AppRejectedTransition extends FinalTransition{ public void transition(RMAppImpl app, RMAppEvent event) { @@ -986,7 +997,7 @@ public String getApplicationType() { } @Override - public boolean isAppSafeToUnregister() { + public boolean isAppSafeToTerminate() { RMAppState state = getState(); return state.equals(RMAppState.FINISHING) || state.equals(RMAppState.FINISHED) || state.equals(RMAppState.FAILED) @@ -1003,6 +1014,9 @@ public YarnApplicationState createApplicationState() { if (rmAppState.equals(RMAppState.FINAL_SAVING)) { rmAppState = stateBeforeFinalSaving; } + if (rmAppState.equals(RMAppState.KILLING)) { + rmAppState = stateBeforeKilling; + } switch (rmAppState) { case NEW: return YarnApplicationState.NEW; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java index ececdae..bcef85f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java @@ -28,5 +28,6 @@ FINISHING, FINISHED, FAILED, + KILLING, KILLED } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 9944c9c..6945378 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -277,12 +278,10 @@ public void NMwaitForState(NodeId nodeid, NodeState finalState) node.getState()); } - public void killApp(ApplicationId appId) throws Exception { + public KillApplicationResponse killApp(ApplicationId appId) throws Exception { ApplicationClientProtocol client = getClientRMService(); - KillApplicationRequest req = Records - .newRecord(KillApplicationRequest.class); - req.setApplicationId(appId); - client.forceKillApplication(req); + KillApplicationRequest req = KillApplicationRequest.newInstance(appId); + return client.forceKillApplication(req); } // from AMLauncher diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index d396262..9965bdb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -76,8 +77,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -414,10 +416,8 @@ public void testRMRestartAppRunningAMFailed() throws Exception { MockRM rm2 = new MockRM(conf, memStore); rm2.start(); // assert the previous AM state is loaded back on RM recovery. - RMApp recoveredApp = - rm2.getRMContext().getRMApps().get(app0.getApplicationId()); - Assert.assertEquals(RMAppAttemptState.FAILED, recoveredApp - .getAppAttempts().get(am0.getApplicationAttemptId()).getAppAttemptState()); + + rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FAILED); rm1.stop(); rm2.stop(); } @@ -964,8 +964,8 @@ public void testRMRestartOnMaxAppAttempts() throws Exception { Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), attemptState.getMasterContainer().getId()); - // Setting AMLivelinessMonitor interval to be 10 Secs. - conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000); + // Setting AMLivelinessMonitor interval to be 3 Secs. + conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 3000); // start new RM MockRM rm2 = new MockRM(conf, memStore); rm2.start(); @@ -1494,6 +1494,69 @@ public synchronized void checkVersion() Assert.assertTrue(rm1.getServiceState() == STATE.STOPPED); } + + // This is to test Killing application should be able to wait until app + // reaches killed state and also check that attempt state is saved before app + // state is saved. + @Test + public void testKillApplicationOnRMRestart() throws Exception { + MemoryRMStateStore memStore = new TestMemoryRMStateStore(); + memStore.init(conf); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + RMApp app1 = + rm1.submitApp(200, "name", "user", null, false, "default", 1, null, + "myType"); + MockAM am1 = launchAM(app1, rm1, nm1); + + KillApplicationResponse response; + int count = 0; + while (true) { + response = rm1.killApp(app1.getApplicationId()); + if (response.getIsKillCompleted()) { + break; + } + count++; + } + Assert.assertTrue(count > 0); + + rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.KILLED); + rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED); + Assert.assertEquals(1, + ((TestMemoryRMStateStore) memStore).updateAttempt); + Assert + .assertEquals(2, ((TestMemoryRMStateStore) memStore).updateApp); + } + + public class TestMemoryRMStateStore extends MemoryRMStateStore { + int count = 0; + public int updateApp = 0; + public int updateAttempt = 0; + + @Override + public void updateApplicationStateInternal(String appId, + ApplicationStateDataPBImpl appStateData) throws Exception { + updateApp = ++count; + super.updateApplicationStateInternal(appId, appStateData); + } + + @Override + public synchronized void + updateApplicationAttemptStateInternal(String attemptIdStr, + ApplicationAttemptStateDataPBImpl attemptStateData) + throws Exception { + updateAttempt = ++count; + super.updateApplicationAttemptStateInternal(attemptIdStr, + attemptStateData); + } + } + public static class TestSecurityMockRM extends MockRM { public TestSecurityMockRM(Configuration conf, RMStateStore store) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index b90c711..aa116bf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -145,7 +145,7 @@ public void setQueue(String name) { } @Override - public boolean isAppSafeToUnregister() { + public boolean isAppSafeToTerminate() { throw new UnsupportedOperationException("Not supported yet."); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index bcb2f6f..debcffe 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -218,7 +218,7 @@ public String getApplicationType() { } @Override - public boolean isAppSafeToUnregister() { + public boolean isAppSafeToTerminate() { return true; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 5b68723..58ab563 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -301,12 +301,9 @@ private void assertKilled(RMApp application) { private void assertAppAndAttemptKilled(RMApp application) throws InterruptedException { + sendAttemptUpdateSavedEvent(application); sendAppUpdateSavedEvent(application); assertKilled(application); - // send attempt final state saved event. - application.getCurrentAppAttempt().handle( - new RMAppAttemptUpdateSavedEvent(application.getCurrentAppAttempt() - .getAppAttemptId(), null)); Assert.assertEquals(RMAppAttemptState.KILLED, application .getCurrentAppAttempt().getAppAttemptState()); assertAppFinalStateSaved(application); @@ -329,6 +326,12 @@ private void sendAppUpdateSavedEvent(RMApp application) { rmDispatcher.await(); } + private void sendAttemptUpdateSavedEvent(RMApp application) { + application.getCurrentAppAttempt().handle( + new RMAppAttemptUpdateSavedEvent(application.getCurrentAppAttempt() + .getAppAttemptId(), null)); + } + protected RMApp testCreateAppNewSaving( ApplicationSubmissionContext submissionContext) throws IOException { RMApp application = createNewTestApp(submissionContext); @@ -624,11 +627,12 @@ public void testAppRunningKill() throws IOException { rmDispatcher.await(); // Ignore Attempt_Finished if we were supposed to go to Finished. - assertAppState(RMAppState.FINAL_SAVING, application); + assertAppState(RMAppState.KILLING, application); RMAppEvent finishEvent = new RMAppFinishedAttemptEvent(application.getApplicationId(), null); application.handle(finishEvent); - assertAppState(RMAppState.FINAL_SAVING, application); + assertAppState(RMAppState.KILLING, application); + sendAttemptUpdateSavedEvent(application); sendAppUpdateSavedEvent(application); assertKilled(application); } @@ -685,19 +689,6 @@ public void testAppRunningFailed() throws IOException { assertFailed(application, ".*Failing the application.*"); } - @Test - public void testAppFinishingKill() throws IOException { - LOG.info("--- START: testAppFinishedFinished ---"); - - RMApp application = testCreateAppFinishing(null); - // FINISHING => FINISHED event RMAppEventType.KILL - RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); - application.handle(event); - rmDispatcher.await(); - assertAppState(RMAppState.FINISHED, application); - } - // While App is at FINAL_SAVING, Attempt_Finished event may come before // App_Saved event, we stay on FINAL_SAVING on Attempt_Finished event // and then directly jump from FINAL_SAVING to FINISHED state on App_Saved @@ -780,6 +771,7 @@ public void testAppKilledKilled() throws IOException { new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); application.handle(event); rmDispatcher.await(); + sendAttemptUpdateSavedEvent(application); sendAppUpdateSavedEvent(application); assertTimesAtFinish(application); assertAppState(RMAppState.KILLED, application); @@ -801,14 +793,6 @@ public void testAppKilledKilled() throws IOException { assertTimesAtFinish(application); assertAppState(RMAppState.KILLED, application); - // KILLED => KILLED event RMAppEventType.ATTEMPT_KILLED - event = - new RMAppEvent(application.getApplicationId(), - RMAppEventType.ATTEMPT_KILLED); - application.handle(event); - rmDispatcher.await(); - assertTimesAtFinish(application); - assertAppState(RMAppState.KILLED, application); // KILLED => KILLED event RMAppEventType.KILL event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);