diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationMasterResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationMasterResponse.java
index 4317b67..08e1220 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationMasterResponse.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationMasterResponse.java
@@ -23,13 +23,25 @@
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.util.Records;
/**
- *
The response sent by the ResourceManager to a
- * ApplicationMaster on it's completion.
+ *
+ * The response sent by the ResourceManager to a
+ * ApplicationMaster on it's completion.
+ *
*
- * Currently, this is empty.
+ *
+ * The response, includes:
+ *
+ * - A flag to check whether the application information is removed from
+ * RMStateStore. This is only needed for RM recovery purpose.
+ *
+ *
+ * Note: If RM recovery is enabled, user is expected to retry this flag until it
+ * becomes true. Otherwise, will risk, though not likely, restarting the already
+ * finished application after RM restarts.
*
* @see ApplicationMasterProtocol#finishApplicationMaster(FinishApplicationMasterRequest)
*/
@@ -38,9 +50,27 @@
public abstract class FinishApplicationMasterResponse {
@Private
@Unstable
- public static FinishApplicationMasterResponse newInstance() {
+ public static FinishApplicationMasterResponse newInstance(boolean isRemovedFromRMStateStore) {
FinishApplicationMasterResponse response =
Records.newRecord(FinishApplicationMasterResponse.class);
+ response.setIsRemovedFromRMStateSore(isRemovedFromRMStateStore);
return response;
}
+
+ /**
+ * Get the flag which indicates that application information has been removed from
+ * RMStateStore.
+ */
+ @Public
+ @Stable
+ public abstract boolean getIsRemovedFromRMStateSore();
+
+ /**
+ * Set the flag which indicates that application information has been removed from
+ * RMStateStore.
+ */
+ @Private
+ @Unstable
+ public abstract boolean setIsRemovedFromRMStateSore(
+ boolean isRemovedFromRMStateStore);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index bd009e0..0bf9c72 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -52,6 +52,7 @@ message FinishApplicationMasterRequestProto {
}
message FinishApplicationMasterResponseProto {
+ optional bool is_removed_from_RMStateStore = 1 [default = false];
}
message AllocateRequestProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationMasterResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationMasterResponsePBImpl.java
index ff57eb4..f4eb7dd 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationMasterResponsePBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationMasterResponsePBImpl.java
@@ -22,7 +22,9 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterResponseProtoOrBuilder;
import com.google.protobuf.TextFormat;
@@ -67,4 +69,25 @@ public boolean equals(Object other) {
public String toString() {
return TextFormat.shortDebugString(getProto());
}
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = FinishApplicationMasterResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public boolean getIsRemovedFromRMStateSore() {
+ FinishApplicationMasterResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ return p.getIsRemovedFromRMStateStore();
+ }
+
+ @Override
+ public boolean setIsRemovedFromRMStateSore(boolean isRemovedFromRMStateStore) {
+ maybeInitBuilder();
+ builder.setIsRemovedFromRMStateStore(isRemovedFromRMStateStore);
+ return false;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index a41792d..c8b2db2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -72,6 +72,7 @@
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
@@ -298,13 +299,19 @@ public FinishApplicationMasterResponse finishApplicationMaster(
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
+ // App is in finishing state, that means app info has been deleted from
+ // RMStateStore.
+ if (RMAppState.FINISHING.equals(rmContext.getRMApps()
+ .get(applicationAttemptId.getApplicationId()).getState())) {
+ return FinishApplicationMasterResponse.newInstance(true);
+ }
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
.getTrackingUrl(), request.getFinalApplicationStatus(), request
.getDiagnostics()));
- FinishApplicationMasterResponse response = recordFactory
- .newRecordInstance(FinishApplicationMasterResponse.class);
+ FinishApplicationMasterResponse response =
+ FinishApplicationMasterResponse.newInstance(false);
return response;
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 6439df1..634ea36 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -187,7 +187,8 @@ protected synchronized void finishApplication(ApplicationId applicationId) {
completedApps.add(applicationId);
writeAuditLog(applicationId);
- // application completely done. Remove from state
+ // application completely done and remove from state store.
+ // App state may be already removed during RMAppRemovingTransition.
RMStateStore store = rmContext.getStateStore();
store.removeApplication(rmContext.getRMApps().get(applicationId));
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
index de1f65a..8516ff7 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
@@ -336,6 +336,8 @@ private Path getAppDir(Path root, String appId) {
// FileSystem related code
private void deleteFile(Path deletePath) throws Exception {
+ if(!fs.exists(deletePath))
+ return;
if(!fs.delete(deletePath, true)) {
throw new Exception("Failed to delete " + deletePath);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index 865e726..1a6342d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -50,6 +50,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRemovedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
@@ -477,12 +478,15 @@ private synchronized void handleStoreEvent(RMStateStoreEvent event) {
ApplicationState appState =
((RMStateStoreRemoveAppEvent) event).getAppState();
ApplicationId appId = appState.getAppId();
-
+ Exception removedException = null;
LOG.info("Removing info for app: " + appId);
try {
removeApplicationState(appState);
} catch (Exception e) {
LOG.error("Error removing app: " + appId, e);
+ removedException = e;
+ } finally {
+ notifyDoneRemovingApplcation(appId, removedException);
}
}
break;
@@ -516,7 +520,18 @@ private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId,
rmDispatcher.getEventHandler().handle(
new RMAppAttemptStoredEvent(attemptId, storedException));
}
-
+
+ @SuppressWarnings("unchecked")
+ /**
+ * This is to notify RMApp that this application has been removed from
+ * RMStateStore
+ */
+ private void notifyDoneRemovingApplcation(ApplicationId appId,
+ Exception removedException) {
+ rmDispatcher.getEventHandler().handle(
+ new RMAppStoredEvent(appId, removedException));
+ }
+
/**
* EventHandler implementation which forward events to the FSRMStateStore
* This hides the EventHandle methods of the store from its public interface
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
index d15e12e..b77b08a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
@@ -27,11 +27,14 @@
// Source: RMAppAttempt
APP_REJECTED,
APP_ACCEPTED,
- APP_SAVED,
ATTEMPT_REGISTERED,
ATTEMPT_FINISHING,
ATTEMPT_FINISHED, // Will send the final state
ATTEMPT_FAILED,
ATTEMPT_KILLED,
- NODE_UPDATE
+ NODE_UPDATE,
+
+ // Source: RMStateStore
+ APP_SAVED,
+ APP_REMOVED
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index a11b05e..ba7552c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -57,6 +57,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -168,8 +169,10 @@
// Transitions from RUNNING state
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
- .addTransition(RMAppState.RUNNING, RMAppState.FINISHING,
- RMAppEventType.ATTEMPT_FINISHING, new RMAppFinishingTransition())
+ .addTransition(RMAppState.RUNNING,
+ EnumSet.of(RMAppState.FINISHING, RMAppState.APP_REMOVING),
+ RMAppEventType.ATTEMPT_FINISHING,
+ new RMAppFinishingOrRemovingTransition())
.addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
.addTransition(RMAppState.RUNNING,
@@ -179,6 +182,19 @@
.addTransition(RMAppState.RUNNING, RMAppState.KILLED,
RMAppEventType.KILL, new KillAppAndAttemptTransition())
+ // Transitions from APP_REMOVING state
+ .addTransition(RMAppState.APP_REMOVING, RMAppState.FINISHING,
+ RMAppEventType.APP_REMOVED, new RMAppFinishingTransition())
+ .addTransition(RMAppState.APP_REMOVING, RMAppState.FINISHED,
+ RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
+ .addTransition(RMAppState.APP_REMOVING, RMAppState.KILLED,
+ RMAppEventType.KILL, new KillAppAndAttemptTransition())
+ // ignorable transitions
+ .addTransition(RMAppState.APP_REMOVING, RMAppState.APP_REMOVING,
+ RMAppEventType.ATTEMPT_FINISHING)
+ .addTransition(RMAppState.APP_REMOVING, RMAppState.APP_REMOVING,
+ RMAppEventType.NODE_UPDATE)
+
// Transitions from FINISHING state
.addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
@@ -682,6 +698,22 @@ public void transition(RMAppImpl app, RMAppEvent event) {
}
}
+ private static final class RMAppFinishingOrRemovingTransition implements
+ MultipleArcTransition {
+ @Override
+ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
+ boolean isRecoveryEnabled =
+ app.conf.getBoolean(YarnConfiguration.RECOVERY_ENABLED,
+ YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
+ if (isRecoveryEnabled) {
+ LOG.info("Removing application with id " + app.applicationId);
+ app.rmContext.getStateStore().removeApplication(app);
+ return RMAppState.APP_REMOVING;
+ } else
+ return RMAppState.FINISHING;
+ }
+ }
+
private static class AppFinishedTransition extends FinalTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
RMAppFinishedAttemptEvent finishedEvent =
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java
index b7f9325..fa6e25f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java
@@ -24,6 +24,7 @@
SUBMITTED,
ACCEPTED,
RUNNING,
+ APP_REMOVING,
FINISHING,
FINISHED,
FAILED,