diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java index 71aa28b..2b2a5cc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/KillApplicationResponse.java @@ -26,10 +26,20 @@ import org.apache.hadoop.yarn.util.Records; /** - *
The response sent by the ResourceManager to the client
- * aborting a submitted application.
Currently it's empty.
+ *
+ * The response sent by the ResourceManager to the client aborting
+ * a submitted application.
+ *
+ * The response, includes: + *
ResourceManager crashes before the application is
+ * successfully killed, the ResourceManager may retry this
+ * application on recovery.
+ *
*
* @see ApplicationClientProtocol#forceKillApplication(KillApplicationRequest)
*/
@@ -38,9 +48,24 @@
public abstract class KillApplicationResponse {
@Private
@Unstable
- public static KillApplicationResponse newInstance() {
+ public static KillApplicationResponse newInstance(boolean isKilled) {
KillApplicationResponse response =
Records.newRecord(KillApplicationResponse.class);
+ response.setIsKilled(isKilled);
return response;
}
+
+ /**
+ * Get the flag which indicates that the application is successfully killed.
+ */
+ @Public
+ @Stable
+ public abstract boolean getIsKilled();
+
+ /**
+ * Set the flag which indicates that the application is successfully killed.
+ */
+ @Private
+ @Unstable
+ public abstract void setIsKilled(boolean isKilled);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index cfe71d4..7321f78 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -113,6 +113,7 @@ message KillApplicationRequestProto {
}
message KillApplicationResponseProto {
+ optional bool isKilled = 1 [default = false];
}
message GetClusterMetricsRequestProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index d35e1a4..2ad1f2f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -48,6 +48,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -179,11 +180,30 @@ public YarnClientApplication createApplication()
@Override
public void killApplication(ApplicationId applicationId)
throws YarnException, IOException {
- LOG.info("Killing application " + applicationId);
KillApplicationRequest request =
Records.newRecord(KillApplicationRequest.class);
request.setApplicationId(applicationId);
- rmClient.forceKillApplication(request);
+
+ try {
+ int pollCount = 0;
+ while (true) {
+ KillApplicationResponse response =
+ rmClient.forceKillApplication(request);
+ if (response.getIsKilled()) {
+ break;
+ }
+ if (++pollCount % 10 == 0) {
+ LOG.info("Watiting for application " + applicationId
+ + " to be killed.");
+ }
+ Thread.sleep(100);
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while waiting for application " + applicationId
+ + " to be killed.");
+ return;
+ }
+ LOG.info("Killed application " + applicationId);
}
@Override
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java
index 14e0c1f..6ffcd3e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/KillApplicationResponsePBImpl.java
@@ -22,7 +22,9 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProtoOrBuilder;
import com.google.protobuf.TextFormat;
@@ -67,4 +69,24 @@ public boolean equals(Object other) {
public String toString() {
return TextFormat.shortDebugString(getProto());
}
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = KillApplicationResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public boolean getIsKilled() {
+ KillApplicationResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ return p.getIsKilled();
+ }
+
+ @Override
+ public void setIsKilled(boolean isKilled) {
+ maybeInitBuilder();
+ builder.setIsKilled(isKilled);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index f070f28..705b212 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -298,7 +298,7 @@ public FinishApplicationMasterResponse finishApplicationMaster(
.getDiagnostics()));
if (rmContext.getRMApps().get(applicationAttemptId.getApplicationId())
- .isAppSafeToUnregister()) {
+ .isAppSafeToTerminate()) {
return FinishApplicationMasterResponse.newInstance(true);
} else {
return FinishApplicationMasterResponse.newInstance(false);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index c3410a9..42277e6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -87,8 +87,9 @@
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -377,14 +378,17 @@ public KillApplicationResponse forceKillApplication(
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
}
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppEvent(applicationId, RMAppEventType.KILL));
+ if (application.isAppSafeToTerminate()) {
+ return KillApplicationResponse.newInstance(true);
+ }
- RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
- AuditConstants.KILL_APP_REQUEST, "ClientRMService" , applicationId);
- KillApplicationResponse response = recordFactory
- .newRecordInstance(KillApplicationResponse.class);
- return response;
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppAttemptEvent(application.getCurrentAppAttempt()
+ .getAppAttemptId(), RMAppAttemptEventType.KILL));
+
+ RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
+ AuditConstants.KILL_APP_REQUEST, "ClientRMService", applicationId);
+ return KillApplicationResponse.newInstance(false);
}
@Override
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index fadaa3b..1809a4b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -197,13 +197,13 @@ ApplicationReport createAndGetApplicationReport(String clientUserName,
String getApplicationType();
/**
- * Check whether this application is safe to unregister.
- * An application is deemed to be safe to unregister if it is an unmanaged
- * AM or its state has been removed from state store.
+ * Check whether this application is safe to terminate.
+ * An application is deemed to be safe to terminate if it is an unmanaged
+ * AM or its state has been saved in state store.
* @return the flag which indicates whether this application is safe to
- * unregister.
+ * terminate.
*/
- boolean isAppSafeToUnregister();
+ boolean isAppSafeToTerminate();
/**
* Create the external user-facing state of ApplicationMaster from the
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
index a2fa0e2..0d7351e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
@@ -22,7 +22,6 @@
// Source: ClientRMService
START,
RECOVER,
- KILL,
// Source: RMAppAttempt
APP_REJECTED,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 76d59ec..cc1ac9f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -135,9 +135,10 @@
RMAppState.RUNNING, RMAppState.FINISHED, RMAppState.FAILED,
RMAppState.KILLED, RMAppState.FINAL_SAVING),
RMAppEventType.RECOVER, new RMAppRecoveredTransition())
- .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, RMAppEventType.KILL,
+ .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING,
+ RMAppEventType.ATTEMPT_KILLED,
new FinalSavingTransition(
- new AppKilledTransition(), RMAppState.KILLED))
+ new AttemptKilledTransition(), RMAppState.KILLED))
.addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING,
RMAppEventType.APP_REJECTED,
new FinalSavingTransition(
@@ -149,9 +150,9 @@
.addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
RMAppEventType.APP_NEW_SAVED, new StartAppAttemptTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
- RMAppEventType.KILL,
+ RMAppEventType.ATTEMPT_KILLED,
new FinalSavingTransition(
- new AppKilledTransition(), RMAppState.KILLED))
+ new AttemptKilledTransition(), RMAppState.KILLED))
.addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
RMAppEventType.APP_REJECTED,
new FinalSavingTransition(new AppRejectedTransition(),
@@ -167,9 +168,9 @@
.addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
RMAppEventType.APP_ACCEPTED)
.addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
- RMAppEventType.KILL,
+ RMAppEventType.ATTEMPT_KILLED,
new FinalSavingTransition(
- new KillAppAndAttemptTransition(), RMAppState.KILLED))
+ new AttemptKilledTransition(), RMAppState.KILLED))
// Transitions from ACCEPTED state
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
@@ -181,9 +182,9 @@
RMAppEventType.ATTEMPT_FAILED,
new AttemptFailedTransition(RMAppState.SUBMITTED))
.addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
- RMAppEventType.KILL,
+ RMAppEventType.ATTEMPT_KILLED,
new FinalSavingTransition(
- new KillAppAndAttemptTransition(), RMAppState.KILLED))
+ new AttemptKilledTransition(), RMAppState.KILLED))
// Transitions from RUNNING state
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
@@ -201,9 +202,9 @@
RMAppEventType.ATTEMPT_FAILED,
new AttemptFailedTransition(RMAppState.SUBMITTED))
.addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
- RMAppEventType.KILL,
+ RMAppEventType.ATTEMPT_KILLED,
new FinalSavingTransition(
- new KillAppAndAttemptTransition(), RMAppState.KILLED))
+ new AttemptKilledTransition(), RMAppState.KILLED))
// Transitions from FINAL_SAVING state
.addTransition(RMAppState.FINAL_SAVING,
@@ -215,17 +216,15 @@
new AttemptFinishedAtFinalSavingTransition())
// ignorable transitions
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
- EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
+ EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.ATTEMPT_KILLED,
RMAppEventType.APP_NEW_SAVED))
// Transitions from FINISHING state
.addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
- .addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
- RMAppEventType.KILL, new KillAppAndAttemptTransition())
// ignorable transitions
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
- EnumSet.of(RMAppEventType.NODE_UPDATE))
+ EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.ATTEMPT_KILLED))
// Transitions from FINISHED state
// ignorable transitions
@@ -234,12 +233,12 @@
RMAppEventType.NODE_UPDATE,
RMAppEventType.ATTEMPT_UNREGISTERED,
RMAppEventType.ATTEMPT_FINISHED,
- RMAppEventType.KILL))
+ RMAppEventType.ATTEMPT_KILLED))
// Transitions from FAILED state
// ignorable transitions
.addTransition(RMAppState.FAILED, RMAppState.FAILED,
- EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE))
+ EnumSet.of(RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE))
// Transitions from KILLED state
// ignorable transitions
@@ -247,7 +246,7 @@
RMAppState.KILLED,
RMAppState.KILLED,
EnumSet.of(RMAppEventType.APP_ACCEPTED,
- RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
+ RMAppEventType.APP_REJECTED,
RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE))
@@ -809,7 +808,7 @@ private void rememberTargetTransitionsAndStoreState(RMAppEvent event,
RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
diags = getAppAttemptFailedDiagnostics(failedEvent);
break;
- case KILL:
+ case ATTEMPT_KILLED:
diags = getAppKilledDiagnostics();
break;
default:
@@ -896,7 +895,7 @@ public void transition(RMAppImpl app, RMAppEvent event) {
}
- private static class AppKilledTransition extends FinalTransition {
+ private static class AttemptKilledTransition extends FinalTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
app.diagnostics.append("Application killed by user.");
@@ -908,15 +907,6 @@ private static String getAppKilledDiagnostics() {
return "Application killed by user.";
}
- private static class KillAppAndAttemptTransition extends AppKilledTransition {
- @SuppressWarnings("unchecked")
- @Override
- public void transition(RMAppImpl app, RMAppEvent event) {
- app.handler.handle(new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(),
- RMAppAttemptEventType.KILL));
- super.transition(app, event);
- }
- }
private static final class AppRejectedTransition extends
FinalTransition{
public void transition(RMAppImpl app, RMAppEvent event) {
@@ -984,9 +974,12 @@ public String getApplicationType() {
}
@Override
- public boolean isAppSafeToUnregister() {
+ public boolean isAppSafeToTerminate() {
RMAppState state = getState();
- return state.equals(RMAppState.FINISHING)
+ boolean isRecoveryEnabled =
+ conf.getBoolean(YarnConfiguration.RECOVERY_ENABLED,
+ YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
+ return !isRecoveryEnabled || state.equals(RMAppState.FINISHING)
|| state.equals(RMAppState.FINISHED) || state.equals(RMAppState.FAILED)
|| state.equals(RMAppState.KILLED) ||
// If this is an unmanaged AM, we are safe to unregister since unmanaged
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 78ca7d1..472c055 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -277,12 +278,10 @@ public void NMwaitForState(NodeId nodeid, NodeState finalState)
node.getState());
}
- public void killApp(ApplicationId appId) throws Exception {
+ public KillApplicationResponse killApp(ApplicationId appId) throws Exception {
ApplicationClientProtocol client = getClientRMService();
- KillApplicationRequest req = Records
- .newRecord(KillApplicationRequest.class);
- req.setApplicationId(appId);
- client.forceKillApplication(req);
+ KillApplicationRequest req = KillApplicationRequest.newInstance(appId);
+ return client.forceKillApplication(req);
}
// from AMLauncher
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
index 8fe8de4..63c36de 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
@@ -68,7 +68,7 @@
*/
public class TestAppManager{
- private static RMAppEventType appEventType = RMAppEventType.KILL;
+ private static RMAppEventType appEventType = RMAppEventType.ATTEMPT_KILLED;
public synchronized RMAppEventType getAppEventType() {
return appEventType;
@@ -216,7 +216,7 @@ public void setUp() {
@After
public void tearDown() {
- setAppEventType(RMAppEventType.KILL);
+ setAppEventType(RMAppEventType.ATTEMPT_KILLED);
((Service)rmContext.getDispatcher()).stop();
}
@@ -370,7 +370,8 @@ protected void setupDispatcher(RMContext rmContext, Configuration conf) {
rmContext.getDispatcher().register(RMAppManagerEventType.class, testAppManagerDispatcher);
((Service)rmContext.getDispatcher()).init(conf);
((Service)rmContext.getDispatcher()).start();
- Assert.assertEquals("app event type is wrong before", RMAppEventType.KILL, appEventType);
+ Assert.assertEquals("app event type is wrong before",
+ RMAppEventType.ATTEMPT_KILLED, appEventType);
}
@Test
@@ -383,7 +384,7 @@ public void testRMAppSubmit() throws Exception {
// wait for event to be processed
int timeoutSecs = 0;
- while ((getAppEventType() == RMAppEventType.KILL) &&
+ while ((getAppEventType() == RMAppEventType.ATTEMPT_KILLED) &&
timeoutSecs++ < 20) {
Thread.sleep(1000);
}
@@ -423,11 +424,11 @@ public void testRMAppSubmitMaxAppAttempts() throws Exception {
// wait for event to be processed
int timeoutSecs = 0;
- while ((getAppEventType() == RMAppEventType.KILL) &&
+ while ((getAppEventType() == RMAppEventType.ATTEMPT_KILLED) &&
timeoutSecs++ < 20) {
Thread.sleep(1000);
}
- setAppEventType(RMAppEventType.KILL);
+ setAppEventType(RMAppEventType.ATTEMPT_KILLED);
}
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index 9e14566..9593130 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -34,7 +34,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -55,6 +54,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -68,9 +68,6 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -80,6 +77,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@@ -93,7 +92,6 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.mortbay.log.Log;
public class TestRMRestart {
@@ -917,8 +915,8 @@ public void testRMRestartOnMaxAppAttempts() throws Exception {
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
attemptState.getMasterContainer().getId());
- // Setting AMLivelinessMonitor interval to be 10 Secs.
- conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000);
+ // Setting AMLivelinessMonitor interval to be 3 Secs.
+ conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 3000);
// start new RM
MockRM rm2 = new MockRM(conf, memStore);
rm2.start();
@@ -1376,6 +1374,69 @@ protected void handleStoreEvent(RMStateStoreEvent event) {
Assert.assertTrue(rmAppState.size() == NUM_APPS);
}
+ // This is to test Killing application should be able to wait until app
+ // reaches killed state and also check that attempt state is saved before app
+ // state is saved.
+ @Test
+ public void testKillApplicationOnRMRestart() throws Exception {
+ MemoryRMStateStore memStore = new TestKillAppMemoryRMStateStore();
+ memStore.init(conf);
+
+ // start RM
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ RMApp app1 =
+ rm1.submitApp(200, "name", "user", null, false, "default", 1, null,
+ "myType");
+ MockAM am1 = launchAM(app1, rm1, nm1);
+
+ KillApplicationResponse response;
+ int count = 0;
+ while (true) {
+ response = rm1.killApp(app1.getApplicationId());
+ if (response.getIsKilled()) {
+ break;
+ }
+ count++;
+ }
+ Assert.assertTrue(count > 0);
+
+ rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.KILLED);
+ rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
+ Assert.assertEquals(1,
+ ((TestKillAppMemoryRMStateStore) memStore).updateAttempt);
+ Assert
+ .assertEquals(2, ((TestKillAppMemoryRMStateStore) memStore).updateApp);
+ }
+
+ public class TestKillAppMemoryRMStateStore extends MemoryRMStateStore {
+ int count = 0;
+ public int updateApp = 0;
+ public int updateAttempt = 0;
+
+ @Override
+ public void updateApplicationStateInternal(String appId,
+ ApplicationStateDataPBImpl appStateData) throws Exception {
+ updateApp = ++count;
+ super.updateApplicationStateInternal(appId, appStateData);
+ }
+
+ @Override
+ public synchronized void
+ updateApplicationAttemptStateInternal(String attemptIdStr,
+ ApplicationAttemptStateDataPBImpl attemptStateData)
+ throws Exception {
+ updateAttempt = ++count;
+ super.updateApplicationAttemptStateInternal(attemptIdStr,
+ attemptStateData);
+ }
+ }
+
+
public static class TestSecurityMockRM extends MockRM {
public TestSecurityMockRM(Configuration conf, RMStateStore store) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index b90c711..aa116bf 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -145,7 +145,7 @@ public void setQueue(String name) {
}
@Override
- public boolean isAppSafeToUnregister() {
+ public boolean isAppSafeToTerminate() {
throw new UnsupportedOperationException("Not supported yet.");
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index bcb2f6f..debcffe 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -218,7 +218,7 @@ public String getApplicationType() {
}
@Override
- public boolean isAppSafeToUnregister() {
+ public boolean isAppSafeToTerminate() {
return true;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
index e3e0e69..06884cc 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
@@ -229,6 +229,7 @@ protected RMApp createNewTestApp(ApplicationSubmissionContext submissionContext)
testAppStartState(applicationId, user, name, queue, application);
this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(),
application);
+ rmDispatcher.await();
return application;
}
@@ -296,18 +297,6 @@ private void assertKilled(RMApp application) {
StringBuilder diag = application.getDiagnostics();
Assert.assertEquals("application diagnostics is not correct",
"Application killed by user.", diag.toString());
- }
-
- private void assertAppAndAttemptKilled(RMApp application)
- throws InterruptedException {
- sendAppUpdateSavedEvent(application);
- assertKilled(application);
- // send attempt final state saved event.
- application.getCurrentAppAttempt().handle(
- new RMAppAttemptUpdateSavedEvent(application.getCurrentAppAttempt()
- .getAppAttemptId(), null));
- Assert.assertEquals(RMAppAttemptState.KILLED, application
- .getCurrentAppAttempt().getAppAttemptState());
assertAppFinalStateSaved(application);
}
@@ -328,6 +317,18 @@ private void sendAppUpdateSavedEvent(RMApp application) {
rmDispatcher.await();
}
+ private void killAppAndAttempt(RMApp application) {
+ // Kill the attempt which leads the the application to be killed.
+ application.getCurrentAppAttempt().handle(
+ new RMAppAttemptEvent(application.getCurrentAppAttempt()
+ .getAppAttemptId(), RMAppAttemptEventType.KILL));
+ application.getCurrentAppAttempt().handle(
+ new RMAppAttemptUpdateSavedEvent(application.getCurrentAppAttempt()
+ .getAppAttemptId(), null));
+ Assert.assertEquals(RMAppAttemptState.KILLED, application
+ .getCurrentAppAttempt().getAppAttemptState());
+ }
+
protected RMApp testCreateAppNewSaving(
ApplicationSubmissionContext submissionContext) throws IOException {
RMApp application = createNewTestApp(submissionContext);
@@ -486,9 +487,10 @@ public void testAppNewKill() throws IOException {
LOG.info("--- START: testAppNewKill ---");
RMApp application = createNewTestApp(null);
- // NEW => KILLED event RMAppEventType.KILL
+ // NEW => KILLED event RMAppEventType.ATTEMPT_KILLED
RMAppEvent event =
- new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
+ new RMAppEvent(application.getApplicationId(),
+ RMAppEventType.ATTEMPT_KILLED);
application.handle(event);
rmDispatcher.await();
sendAppUpdateSavedEvent(application);
@@ -515,9 +517,10 @@ public void testAppNewSavingKill() throws IOException {
LOG.info("--- START: testAppNewSavingKill ---");
RMApp application = testCreateAppNewSaving(null);
- // NEW_SAVING => KILLED event RMAppEventType.KILL
+ // NEW_SAVING => KILLED event RMAppEventType.ATTEMPT_KILLED
RMAppEvent event =
- new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
+ new RMAppEvent(application.getApplicationId(),
+ RMAppEventType.ATTEMPT_KILLED);
application.handle(event);
rmDispatcher.await();
sendAppUpdateSavedEvent(application);
@@ -558,12 +561,9 @@ public void testAppSubmittedRejected() throws IOException {
public void testAppSubmittedKill() throws IOException, InterruptedException {
LOG.info("--- START: testAppSubmittedKill---");
RMApp application = testCreateAppSubmittedNoRecovery(null);
- // SUBMITTED => KILLED event RMAppEventType.KILL
- RMAppEvent event = new RMAppEvent(application.getApplicationId(),
- RMAppEventType.KILL);
- application.handle(event);
- rmDispatcher.await();
- assertAppAndAttemptKilled(application);
+ killAppAndAttempt(application);
+ sendAppUpdateSavedEvent(application);
+ assertKilled(application);
}
@Test
@@ -603,12 +603,10 @@ public void testAppAcceptedFailed() throws IOException {
public void testAppAcceptedKill() throws IOException, InterruptedException {
LOG.info("--- START: testAppAcceptedKill ---");
RMApp application = testCreateAppAccepted(null);
- // ACCEPTED => KILLED event RMAppEventType.KILL
- RMAppEvent event = new RMAppEvent(application.getApplicationId(),
- RMAppEventType.KILL);
- application.handle(event);
- rmDispatcher.await();
- assertAppAndAttemptKilled(application);
+ // ACCEPTED => KILLED event RMAppEventType.ATTEMPT_KILLED
+ killAppAndAttempt(application);
+ sendAppUpdateSavedEvent(application);
+ assertKilled(application);
}
@Test
@@ -616,11 +614,8 @@ public void testAppRunningKill() throws IOException {
LOG.info("--- START: testAppRunningKill ---");
RMApp application = testCreateAppRunning(null);
- // RUNNING => KILLED event RMAppEventType.KILL
- RMAppEvent event =
- new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
- application.handle(event);
- rmDispatcher.await();
+ // RUNNING => KILLED event RMAppEventType.ATTEMPT_KILLED
+ killAppAndAttempt(application);
// Ignore Attempt_Finished if we were supposed to go to Finished.
assertAppState(RMAppState.FINAL_SAVING, application);
@@ -677,8 +672,9 @@ public void testAppRunningFailed() throws IOException {
sendAppUpdateSavedEvent(application);
assertFailed(application, ".*Failing the application.*");
- // FAILED => FAILED event RMAppEventType.KILL
- event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
+ // FAILED => FAILED event RMAppEventType.ATTEMPT_KILLED
+ event = new RMAppEvent(application.getApplicationId(),
+ RMAppEventType.ATTEMPT_KILLED);
application.handle(event);
rmDispatcher.await();
assertFailed(application, ".*Failing the application.*");
@@ -689,9 +685,9 @@ public void testAppFinishingKill() throws IOException {
LOG.info("--- START: testAppFinishedFinished ---");
RMApp application = testCreateAppFinishing(null);
- // FINISHING => FINISHED event RMAppEventType.KILL
- RMAppEvent event =
- new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
+ // FINISHING => FINISHED event RMAppEventType.ATTEMPT_KILLED
+ RMAppEvent event = new RMAppEvent(application.getApplicationId(),
+ RMAppEventType.ATTEMPT_KILLED);
application.handle(event);
rmDispatcher.await();
assertAppState(RMAppState.FINISHED, application);
@@ -729,9 +725,9 @@ public void testAppFinishedFinished() throws IOException {
LOG.info("--- START: testAppFinishedFinished ---");
RMApp application = testCreateAppFinished(null, "");
- // FINISHED => FINISHED event RMAppEventType.KILL
- RMAppEvent event =
- new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
+ // FINISHED => FINISHED event RMAppEventType.ATTEMPT_KILLED
+ RMAppEvent event = new RMAppEvent(application.getApplicationId(),
+ RMAppEventType.ATTEMPT_KILLED);
application.handle(event);
rmDispatcher.await();
assertTimesAtFinish(application);
@@ -756,9 +752,9 @@ public void testAppFailedFailed() throws IOException {
assertTimesAtFinish(application);
assertAppState(RMAppState.FAILED, application);
- // FAILED => FAILED event RMAppEventType.KILL
- event =
- new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
+ // FAILED => FAILED event RMAppEventType.ATTEMPT_KILLED
+ event = new RMAppEvent(application.getApplicationId(),
+ RMAppEventType.ATTEMPT_KILLED);
application.handle(event);
rmDispatcher.await();
assertTimesAtFinish(application);
@@ -774,9 +770,9 @@ public void testAppKilledKilled() throws IOException {
RMApp application = testCreateAppRunning(null);
- // RUNNING => KILLED event RMAppEventType.KILL
- RMAppEvent event =
- new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
+ // RUNNING => KILLED event RMAppEventType.ATTEMPT_KILLED
+ RMAppEvent event = new RMAppEvent(application.getApplicationId(),
+ RMAppEventType.ATTEMPT_KILLED);
application.handle(event);
rmDispatcher.await();
sendAppUpdateSavedEvent(application);
@@ -809,8 +805,9 @@ public void testAppKilledKilled() throws IOException {
assertTimesAtFinish(application);
assertAppState(RMAppState.KILLED, application);
- // KILLED => KILLED event RMAppEventType.KILL
- event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
+ // KILLED => KILLED event RMAppEventType.ATTEMPT_KILLED
+ event = new RMAppEvent(application.getApplicationId(),
+ RMAppEventType.ATTEMPT_KILLED);
application.handle(event);
rmDispatcher.await();
assertTimesAtFinish(application);