diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java
index 71aa28b..77bb71d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java
@@ -26,10 +26,21 @@
import org.apache.hadoop.yarn.util.Records;
/**
- *
The response sent by the ResourceManager to the client
- * aborting a submitted application.
- *
- * Currently it's empty.
+ *
+ * The response sent by the ResourceManager to the client aborting
+ * a submitted application.
+ *
+ *
+ * The response, includes:
+ *
+ * - A flag which indicates that the process of killing the application is
+ * completed or not.
+ *
+ * Note: user is recommended to wait until this flag becomes true, otherwise if
+ * the ResourceManager crashes before the process of killing the
+ * application is completed, the ResourceManager may retry this
+ * application on recovery.
+ *
*
* @see ApplicationClientProtocol#forceKillApplication(KillApplicationRequest)
*/
@@ -38,9 +49,24 @@
public abstract class KillApplicationResponse {
@Private
@Unstable
- public static KillApplicationResponse newInstance() {
+ public static KillApplicationResponse newInstance(boolean isKillCompleted) {
KillApplicationResponse response =
Records.newRecord(KillApplicationResponse.class);
+ response.setIsKillCompleted(isKillCompleted);
return response;
}
+
+ /**
+ * Get the flag which indicates that the process of killing application is completed or not.
+ */
+ @Public
+ @Stable
+ public abstract boolean getIsKillCompleted();
+
+ /**
+ * Set the flag which indicates that the process of killing application is completed or not.
+ */
+ @Private
+ @Unstable
+ public abstract void setIsKillCompleted(boolean isKillCompleted);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 839765c..ef6e01b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -26,7 +26,6 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
@@ -879,15 +878,23 @@
////////////////////////////////
/**
+ * Use YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS instead.
* The interval of the yarn client's querying application state after
* application submission. The unit is millisecond.
*/
+ @Deprecated
public static final String YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS =
YARN_PREFIX + "client.app-submission.poll-interval";
- public static final long DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS =
- 1000;
/**
+ * The interval that the yarn client library uses to poll the completion
+ * status of the asynchronous API of application client protocol.
+ */
+ public static final String YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS =
+ YARN_PREFIX + "client.application-client-protocol.poll-interval-ms";
+ public static final long DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS =
+ 200;
+ /**
* Max number of threads in NMClientAsync to process container management
* events
*/
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index 332be81..a4631d1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -116,6 +116,7 @@ message KillApplicationRequestProto {
}
message KillApplicationResponseProto {
+ optional bool is_kill_completed = 1 [default = false];
}
message GetClusterMetricsRequestProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index d35e1a4..7c44604 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -48,6 +48,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -79,7 +80,8 @@
protected ApplicationClientProtocol rmClient;
protected InetSocketAddress rmAddress;
- protected long statePollIntervalMillis;
+ protected long submitPollIntervalMillis;
+ private long asyncApiPollIntervalMillis;
private static final String ROOT = "root";
@@ -92,12 +94,20 @@ private static InetSocketAddress getRmAddress(Configuration conf) {
YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
}
+ @SuppressWarnings("deprecation")
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.rmAddress = getRmAddress(conf);
- statePollIntervalMillis = conf.getLong(
+ asyncApiPollIntervalMillis =
+ conf.getLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
+ submitPollIntervalMillis = asyncApiPollIntervalMillis;
+ if (conf.get(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS)
+ != null) {
+ submitPollIntervalMillis = conf.getLong(
YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
- YarnConfiguration.DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS);
+ YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
+ }
super.serviceInit(conf);
}
@@ -165,7 +175,7 @@ public YarnClientApplication createApplication()
" is still in " + state);
}
try {
- Thread.sleep(statePollIntervalMillis);
+ Thread.sleep(submitPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
@@ -179,11 +189,29 @@ public YarnClientApplication createApplication()
@Override
public void killApplication(ApplicationId applicationId)
throws YarnException, IOException {
- LOG.info("Killing application " + applicationId);
KillApplicationRequest request =
Records.newRecord(KillApplicationRequest.class);
request.setApplicationId(applicationId);
- rmClient.forceKillApplication(request);
+
+ try {
+ int pollCount = 0;
+ while (true) {
+ KillApplicationResponse response =
+ rmClient.forceKillApplication(request);
+ if (response.getIsKillCompleted()) {
+ break;
+ }
+ if (++pollCount % 10 == 0) {
+ LOG.info("Watiting for application " + applicationId
+ + " to be killed.");
+ }
+ Thread.sleep(asyncApiPollIntervalMillis);
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while waiting for application " + applicationId
+ + " to be killed.");
+ }
+ LOG.info("Killed application " + applicationId);
}
@Override
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
index 826433d..72b5b28c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
@@ -89,6 +89,7 @@ public void testClientStop() {
rm.stop();
}
+ @SuppressWarnings("deprecation")
@Test (timeout = 30000)
public void testSubmitApplication() {
Configuration conf = new Configuration();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java
index 14e0c1f..139c2a5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java
@@ -22,7 +22,9 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProtoOrBuilder;
import com.google.protobuf.TextFormat;
@@ -67,4 +69,24 @@ public boolean equals(Object other) {
public String toString() {
return TextFormat.shortDebugString(getProto());
}
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = KillApplicationResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public boolean getIsKillCompleted() {
+ KillApplicationResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ return p.getIsKillCompleted();
+ }
+
+ @Override
+ public void setIsKillCompleted(boolean isKillCompleted) {
+ maybeInitBuilder();
+ builder.setIsKillCompleted(isKillCompleted);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index c43dc1a..9673826 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -945,10 +945,10 @@
- The interval of the yarn client's querying application state
- after application submission. The unit is millisecond.
- yarn.client.app-submission.poll-interval
- 1000
+ The interval that the yarn client library uses to poll the
+ completion status of the asynchronous API of application client protocol.
+
+ yarn.client.application-client-protocol.poll-interval-ms
+ 200
-
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index f070f28..16091c6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -292,15 +292,14 @@ public FinishApplicationMasterResponse finishApplicationMaster(
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
- rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
- .getTrackingUrl(), request.getFinalApplicationStatus(), request
- .getDiagnostics()));
-
if (rmContext.getRMApps().get(applicationAttemptId.getApplicationId())
- .isAppSafeToUnregister()) {
+ .isAppSafeToTerminate()) {
return FinishApplicationMasterResponse.newInstance(true);
} else {
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
+ .getTrackingUrl(), request.getFinalApplicationStatus(), request
+ .getDiagnostics()));
return FinishApplicationMasterResponse.newInstance(false);
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index f0e8553..cd2226f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -380,14 +380,15 @@ public KillApplicationResponse forceKillApplication(
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
}
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppEvent(applicationId, RMAppEventType.KILL));
-
- RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
- AuditConstants.KILL_APP_REQUEST, "ClientRMService" , applicationId);
- KillApplicationResponse response = recordFactory
- .newRecordInstance(KillApplicationResponse.class);
- return response;
+ if (application.isAppSafeToTerminate()) {
+ RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
+ AuditConstants.KILL_APP_REQUEST, "ClientRMService", applicationId);
+ return KillApplicationResponse.newInstance(true);
+ } else {
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppEvent(applicationId, RMAppEventType.KILL));
+ return KillApplicationResponse.newInstance(false);
+ }
}
@Override
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index fadaa3b..1809a4b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -197,13 +197,13 @@ ApplicationReport createAndGetApplicationReport(String clientUserName,
String getApplicationType();
/**
- * Check whether this application is safe to unregister.
- * An application is deemed to be safe to unregister if it is an unmanaged
- * AM or its state has been removed from state store.
+ * Check whether this application is safe to terminate.
+ * An application is deemed to be safe to terminate if it is an unmanaged
+ * AM or its state has been saved in state store.
* @return the flag which indicates whether this application is safe to
- * unregister.
+ * terminate.
*/
- boolean isAppSafeToUnregister();
+ boolean isAppSafeToTerminate();
/**
* Create the external user-facing state of ApplicationMaster from the
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
index a2fa0e2..ad3f20d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
@@ -37,5 +37,4 @@
// Source: RMStateStore
APP_NEW_SAVED,
APP_UPDATE_SAVED,
- APP_REMOVED
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 5a70cc2..052deb8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -111,6 +111,7 @@
private static final AppFinishedTransition FINISHED_TRANSITION =
new AppFinishedTransition();
private RMAppState stateBeforeFinalSaving;
+ private RMAppState stateBeforeKilling;
private RMAppEvent eventCausingFinalSaving;
private RMAppState targetedFinalState;
private RMAppState recoveredFinalState;
@@ -166,10 +167,8 @@
new AppRejectedTransition(), RMAppState.FAILED))
.addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
RMAppEventType.APP_ACCEPTED)
- .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
- RMAppEventType.KILL,
- new FinalSavingTransition(
- new KillAppAndAttemptTransition(), RMAppState.KILLED))
+ .addTransition(RMAppState.SUBMITTED, RMAppState.KILLING,
+ RMAppEventType.KILL,new KillAttemptTransition())
// Transitions from ACCEPTED state
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
@@ -180,10 +179,8 @@
EnumSet.of(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING),
RMAppEventType.ATTEMPT_FAILED,
new AttemptFailedTransition(RMAppState.SUBMITTED))
- .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
- RMAppEventType.KILL,
- new FinalSavingTransition(
- new KillAppAndAttemptTransition(), RMAppState.KILLED))
+ .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
+ RMAppEventType.KILL,new KillAttemptTransition())
// Transitions from RUNNING state
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
@@ -200,10 +197,8 @@
EnumSet.of(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING),
RMAppEventType.ATTEMPT_FAILED,
new AttemptFailedTransition(RMAppState.SUBMITTED))
- .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
- RMAppEventType.KILL,
- new FinalSavingTransition(
- new KillAppAndAttemptTransition(), RMAppState.KILLED))
+ .addTransition(RMAppState.RUNNING, RMAppState.KILLING,
+ RMAppEventType.KILL, new KillAttemptTransition())
// Transitions from FINAL_SAVING state
.addTransition(RMAppState.FINAL_SAVING,
@@ -221,11 +216,24 @@
// Transitions from FINISHING state
.addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
- .addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
- RMAppEventType.KILL, new KillAppAndAttemptTransition())
// ignorable transitions
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
- EnumSet.of(RMAppEventType.NODE_UPDATE))
+ EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL))
+
+ // Transitions from KILLING state
+ .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
+ RMAppEventType.ATTEMPT_KILLED,
+ new FinalSavingTransition(
+ new AppKilledTransition(), RMAppState.KILLED))
+ .addTransition(RMAppState.KILLING, RMAppState.KILLING,
+ EnumSet.of(
+ RMAppEventType.NODE_UPDATE,
+ RMAppEventType.ATTEMPT_REGISTERED,
+ RMAppEventType.ATTEMPT_UNREGISTERED,
+ RMAppEventType.ATTEMPT_FINISHED,
+ RMAppEventType.ATTEMPT_FAILED,
+ RMAppEventType.APP_UPDATE_SAVED,
+ RMAppEventType.KILL))
// Transitions from FINISHED state
// ignorable transitions
@@ -249,7 +257,7 @@
EnumSet.of(RMAppEventType.APP_ACCEPTED,
RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
- RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE))
+ RMAppEventType.NODE_UPDATE))
.installTopology();
@@ -419,6 +427,7 @@ private FinalApplicationStatus createFinalApplicationStatus(RMAppState state) {
case ACCEPTED:
case RUNNING:
case FINAL_SAVING:
+ case KILLING:
return FinalApplicationStatus.UNDEFINED;
// finished without a proper final state is the same as failed
case FINISHING:
@@ -681,7 +690,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
}
// No existent attempts means the attempt associated with this app was not
- // started or started but not yet saved。
+ // started or started but not yet saved.
if (app.attempts.isEmpty()) {
app.createNewAttempt(true);
return RMAppState.SUBMITTED;
@@ -812,6 +821,7 @@ private void rememberTargetTransitionsAndStoreState(RMAppEvent event,
diags = getAppAttemptFailedDiagnostics(failedEvent);
break;
case KILL:
+ case ATTEMPT_KILLED:
diags = getAppKilledDiagnostics();
break;
default:
@@ -901,7 +911,7 @@ public void transition(RMAppImpl app, RMAppEvent event) {
private static class AppKilledTransition extends FinalTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
- app.diagnostics.append("Application killed by user.");
+ app.diagnostics.append(getAppKilledDiagnostics());
super.transition(app, event);
};
}
@@ -910,15 +920,16 @@ private static String getAppKilledDiagnostics() {
return "Application killed by user.";
}
- private static class KillAppAndAttemptTransition extends AppKilledTransition {
+ private static class KillAttemptTransition extends RMAppTransition {
@SuppressWarnings("unchecked")
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
- app.handler.handle(new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(),
- RMAppAttemptEventType.KILL));
- super.transition(app, event);
+ app.stateBeforeKilling = app.getState();
+ app.handler.handle(new RMAppAttemptEvent(app.currentAttempt
+ .getAppAttemptId(), RMAppAttemptEventType.KILL));
}
}
+
private static final class AppRejectedTransition extends
FinalTransition{
public void transition(RMAppImpl app, RMAppEvent event) {
@@ -986,7 +997,7 @@ public String getApplicationType() {
}
@Override
- public boolean isAppSafeToUnregister() {
+ public boolean isAppSafeToTerminate() {
RMAppState state = getState();
return state.equals(RMAppState.FINISHING)
|| state.equals(RMAppState.FINISHED) || state.equals(RMAppState.FAILED)
@@ -1003,6 +1014,9 @@ public YarnApplicationState createApplicationState() {
if (rmAppState.equals(RMAppState.FINAL_SAVING)) {
rmAppState = stateBeforeFinalSaving;
}
+ if (rmAppState.equals(RMAppState.KILLING)) {
+ rmAppState = stateBeforeKilling;
+ }
switch (rmAppState) {
case NEW:
return YarnApplicationState.NEW;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java
index ececdae..bcef85f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java
@@ -28,5 +28,6 @@
FINISHING,
FINISHED,
FAILED,
+ KILLING,
KILLED
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 9944c9c..6945378 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -277,12 +278,10 @@ public void NMwaitForState(NodeId nodeid, NodeState finalState)
node.getState());
}
- public void killApp(ApplicationId appId) throws Exception {
+ public KillApplicationResponse killApp(ApplicationId appId) throws Exception {
ApplicationClientProtocol client = getClientRMService();
- KillApplicationRequest req = Records
- .newRecord(KillApplicationRequest.class);
- req.setApplicationId(appId);
- client.forceKillApplication(req);
+ KillApplicationRequest req = KillApplicationRequest.newInstance(appId);
+ return client.forceKillApplication(req);
}
// from AMLauncher
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index d396262..9965bdb 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -54,6 +54,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -76,8 +77,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -414,10 +416,8 @@ public void testRMRestartAppRunningAMFailed() throws Exception {
MockRM rm2 = new MockRM(conf, memStore);
rm2.start();
// assert the previous AM state is loaded back on RM recovery.
- RMApp recoveredApp =
- rm2.getRMContext().getRMApps().get(app0.getApplicationId());
- Assert.assertEquals(RMAppAttemptState.FAILED, recoveredApp
- .getAppAttempts().get(am0.getApplicationAttemptId()).getAppAttemptState());
+
+ rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FAILED);
rm1.stop();
rm2.stop();
}
@@ -964,8 +964,8 @@ public void testRMRestartOnMaxAppAttempts() throws Exception {
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
attemptState.getMasterContainer().getId());
- // Setting AMLivelinessMonitor interval to be 10 Secs.
- conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000);
+ // Setting AMLivelinessMonitor interval to be 3 Secs.
+ conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 3000);
// start new RM
MockRM rm2 = new MockRM(conf, memStore);
rm2.start();
@@ -1494,6 +1494,69 @@ public synchronized void checkVersion()
Assert.assertTrue(rm1.getServiceState() == STATE.STOPPED);
}
+
+ // This is to test Killing application should be able to wait until app
+ // reaches killed state and also check that attempt state is saved before app
+ // state is saved.
+ @Test
+ public void testKillApplicationOnRMRestart() throws Exception {
+ MemoryRMStateStore memStore = new TestMemoryRMStateStore();
+ memStore.init(conf);
+
+ // start RM
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ RMApp app1 =
+ rm1.submitApp(200, "name", "user", null, false, "default", 1, null,
+ "myType");
+ MockAM am1 = launchAM(app1, rm1, nm1);
+
+ KillApplicationResponse response;
+ int count = 0;
+ while (true) {
+ response = rm1.killApp(app1.getApplicationId());
+ if (response.getIsKillCompleted()) {
+ break;
+ }
+ count++;
+ }
+ Assert.assertTrue(count > 0);
+
+ rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.KILLED);
+ rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
+ Assert.assertEquals(1,
+ ((TestMemoryRMStateStore) memStore).updateAttempt);
+ Assert
+ .assertEquals(2, ((TestMemoryRMStateStore) memStore).updateApp);
+ }
+
+ public class TestMemoryRMStateStore extends MemoryRMStateStore {
+ int count = 0;
+ public int updateApp = 0;
+ public int updateAttempt = 0;
+
+ @Override
+ public void updateApplicationStateInternal(String appId,
+ ApplicationStateDataPBImpl appStateData) throws Exception {
+ updateApp = ++count;
+ super.updateApplicationStateInternal(appId, appStateData);
+ }
+
+ @Override
+ public synchronized void
+ updateApplicationAttemptStateInternal(String attemptIdStr,
+ ApplicationAttemptStateDataPBImpl attemptStateData)
+ throws Exception {
+ updateAttempt = ++count;
+ super.updateApplicationAttemptStateInternal(attemptIdStr,
+ attemptStateData);
+ }
+ }
+
public static class TestSecurityMockRM extends MockRM {
public TestSecurityMockRM(Configuration conf, RMStateStore store) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index b90c711..aa116bf 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -145,7 +145,7 @@ public void setQueue(String name) {
}
@Override
- public boolean isAppSafeToUnregister() {
+ public boolean isAppSafeToTerminate() {
throw new UnsupportedOperationException("Not supported yet.");
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index bcb2f6f..debcffe 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -218,7 +218,7 @@ public String getApplicationType() {
}
@Override
- public boolean isAppSafeToUnregister() {
+ public boolean isAppSafeToTerminate() {
return true;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
index 5b68723..58ab563 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
@@ -301,12 +301,9 @@ private void assertKilled(RMApp application) {
private void assertAppAndAttemptKilled(RMApp application)
throws InterruptedException {
+ sendAttemptUpdateSavedEvent(application);
sendAppUpdateSavedEvent(application);
assertKilled(application);
- // send attempt final state saved event.
- application.getCurrentAppAttempt().handle(
- new RMAppAttemptUpdateSavedEvent(application.getCurrentAppAttempt()
- .getAppAttemptId(), null));
Assert.assertEquals(RMAppAttemptState.KILLED, application
.getCurrentAppAttempt().getAppAttemptState());
assertAppFinalStateSaved(application);
@@ -329,6 +326,12 @@ private void sendAppUpdateSavedEvent(RMApp application) {
rmDispatcher.await();
}
+ private void sendAttemptUpdateSavedEvent(RMApp application) {
+ application.getCurrentAppAttempt().handle(
+ new RMAppAttemptUpdateSavedEvent(application.getCurrentAppAttempt()
+ .getAppAttemptId(), null));
+ }
+
protected RMApp testCreateAppNewSaving(
ApplicationSubmissionContext submissionContext) throws IOException {
RMApp application = createNewTestApp(submissionContext);
@@ -624,11 +627,12 @@ public void testAppRunningKill() throws IOException {
rmDispatcher.await();
// Ignore Attempt_Finished if we were supposed to go to Finished.
- assertAppState(RMAppState.FINAL_SAVING, application);
+ assertAppState(RMAppState.KILLING, application);
RMAppEvent finishEvent =
new RMAppFinishedAttemptEvent(application.getApplicationId(), null);
application.handle(finishEvent);
- assertAppState(RMAppState.FINAL_SAVING, application);
+ assertAppState(RMAppState.KILLING, application);
+ sendAttemptUpdateSavedEvent(application);
sendAppUpdateSavedEvent(application);
assertKilled(application);
}
@@ -685,19 +689,6 @@ public void testAppRunningFailed() throws IOException {
assertFailed(application, ".*Failing the application.*");
}
- @Test
- public void testAppFinishingKill() throws IOException {
- LOG.info("--- START: testAppFinishedFinished ---");
-
- RMApp application = testCreateAppFinishing(null);
- // FINISHING => FINISHED event RMAppEventType.KILL
- RMAppEvent event =
- new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
- application.handle(event);
- rmDispatcher.await();
- assertAppState(RMAppState.FINISHED, application);
- }
-
// While App is at FINAL_SAVING, Attempt_Finished event may come before
// App_Saved event, we stay on FINAL_SAVING on Attempt_Finished event
// and then directly jump from FINAL_SAVING to FINISHED state on App_Saved
@@ -780,6 +771,7 @@ public void testAppKilledKilled() throws IOException {
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
application.handle(event);
rmDispatcher.await();
+ sendAttemptUpdateSavedEvent(application);
sendAppUpdateSavedEvent(application);
assertTimesAtFinish(application);
assertAppState(RMAppState.KILLED, application);
@@ -801,14 +793,6 @@ public void testAppKilledKilled() throws IOException {
assertTimesAtFinish(application);
assertAppState(RMAppState.KILLED, application);
- // KILLED => KILLED event RMAppEventType.ATTEMPT_KILLED
- event =
- new RMAppEvent(application.getApplicationId(),
- RMAppEventType.ATTEMPT_KILLED);
- application.handle(event);
- rmDispatcher.await();
- assertTimesAtFinish(application);
- assertAppState(RMAppState.KILLED, application);
// KILLED => KILLED event RMAppEventType.KILL
event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);