diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
index ad1c9f1..29bb092 100644
--- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
+++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
@@ -45,6 +45,7 @@
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -57,6 +58,8 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import com.sun.research.ws.wadl.Response;
+
/**
* Registers/unregisters to RM and sends heartbeats to RM.
*/
@@ -194,7 +197,15 @@ protected void unregister() {
FinishApplicationMasterRequest request =
FinishApplicationMasterRequest.newInstance(finishState,
sb.toString(), historyUrl);
- scheduler.finishApplicationMaster(request);
+ while (true) {
+ FinishApplicationMasterResponse response =
+ scheduler.finishApplicationMaster(request);
+ if (response.getUnregistered()) {
+ break;
+ }
+ Thread.sleep(rmPollInterval);
+ LOG.info("Waiting for application to be successfully unregistered.");
+ }
} catch(Exception are) {
LOG.error("Exception while unregistering ", are);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationMasterResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationMasterResponse.java
index 4317b67..56ff900 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationMasterResponse.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationMasterResponse.java
@@ -26,21 +26,55 @@
import org.apache.hadoop.yarn.util.Records;
/**
- *
The response sent by the ResourceManager to a
- * ApplicationMaster on it's completion.
+ *
+ * The response sent by the ResourceManager to a
+ * ApplicationMaster on it's completion.
+ *
*
- * Currently, this is empty.
+ *
+ * The response, includes:
+ *
+ * - A flag which indicates that the application has successfully unregistered
+ * with RM and the application state has been removed from removed from
+ * RMStateStore.
+ *
+ *
+ * Note: This flag only needed for RM recovery purpose. If RM recovery is
+ * enabled, user is expected to retry this flag until it becomes true.
+ * Otherwise,user will risk restarting an already finished application after RM
+ * restarts.
*
* @see ApplicationMasterProtocol#finishApplicationMaster(FinishApplicationMasterRequest)
*/
@Public
@Stable
public abstract class FinishApplicationMasterResponse {
+
@Private
@Unstable
- public static FinishApplicationMasterResponse newInstance() {
+ public static FinishApplicationMasterResponse newInstance(
+ boolean isRemovedFromRMStateStore) {
FinishApplicationMasterResponse response =
Records.newRecord(FinishApplicationMasterResponse.class);
+ response.setUnregistered(isRemovedFromRMStateStore);
return response;
}
+
+ /**
+ * Get the flag which indicates that the application has successfully
+ * unregistered with RM and the application state has been removed from
+ * RMStateStore.
+ */
+ @Public
+ @Stable
+ public abstract boolean getUnregistered();
+
+ /**
+ * Set the flag which indicates that the application has successfully
+ * unregistered with RM and the application state has been removed from
+ * RMStateStore.
+ */
+ @Private
+ @Unstable
+ public abstract void setUnregistered(boolean isUnregistered);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index 7b3d0cf..713d5bc 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -52,6 +52,7 @@ message FinishApplicationMasterRequestProto {
}
message FinishApplicationMasterResponseProto {
+ optional bool unregistered = 1 [default = false];
}
message AllocateRequestProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
index c433b55..b3cb8bc 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
@@ -44,6 +44,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -300,11 +301,24 @@ public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
String appMessage, String appTrackingUrl) throws YarnException,
IOException {
Preconditions.checkArgument(appStatus != null,
- "AppStatus should not be null.");
+ "AppStatus should not be null.");
FinishApplicationMasterRequest request =
FinishApplicationMasterRequest.newInstance(appStatus, appMessage,
appTrackingUrl);
- rmClient.finishApplicationMaster(request);
+ try {
+ while (true) {
+ FinishApplicationMasterResponse response =
+ rmClient.finishApplicationMaster(request);
+ if (response.getUnregistered()) {
+ break;
+ }
+ Thread.sleep(1000);
+ LOG.info("Waiting for application to be successfully unregistered.");
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted while waiting for application"
+ + " to be removed from RMStateStore");
+ }
}
@Override
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationMasterResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationMasterResponsePBImpl.java
index ff57eb4..e85048c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationMasterResponsePBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationMasterResponsePBImpl.java
@@ -22,7 +22,9 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterResponseProtoOrBuilder;
import com.google.protobuf.TextFormat;
@@ -67,4 +69,24 @@ public boolean equals(Object other) {
public String toString() {
return TextFormat.shortDebugString(getProto());
}
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = FinishApplicationMasterResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public boolean getUnregistered() {
+ FinishApplicationMasterResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ return p.getUnregistered();
+ }
+
+ @Override
+ public void setUnregistered(boolean isUnregistered) {
+ maybeInitBuilder();
+ builder.setUnregistered(isUnregistered);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index a41792d..487bc68 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -72,6 +72,7 @@
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
@@ -298,13 +299,20 @@ public FinishApplicationMasterResponse finishApplicationMaster(
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
+ // Application state has been removed from RMStateStore, if it's in
+ // FINISHING state
+ if (rmContext.getRMApps().get(applicationAttemptId.getApplicationId())
+ .getState().equals(RMAppState.FINISHING)) {
+ return FinishApplicationMasterResponse.newInstance(true);
+ }
+
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
.getTrackingUrl(), request.getFinalApplicationStatus(), request
.getDiagnostics()));
- FinishApplicationMasterResponse response = recordFactory
- .newRecordInstance(FinishApplicationMasterResponse.class);
+ FinishApplicationMasterResponse response =
+ FinishApplicationMasterResponse.newInstance(false);
return response;
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 6439df1..0c38f50 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -186,10 +186,6 @@ protected synchronized void finishApplication(ApplicationId applicationId) {
completedApps.add(applicationId);
writeAuditLog(applicationId);
-
- // application completely done. Remove from state
- RMStateStore store = rmContext.getStateStore();
- store.removeApplication(rmContext.getRMApps().get(applicationId));
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
index 30d5d41..bba8425 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
@@ -336,6 +336,8 @@ private Path getAppDir(Path root, String appId) {
// FileSystem related code
private void deleteFile(Path deletePath) throws Exception {
+ if(!fs.exists(deletePath))
+ return;
if(!fs.delete(deletePath, true)) {
throw new Exception("Failed to delete " + deletePath);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index 179b721..382ed97 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -51,6 +51,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRemovedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
@@ -482,12 +483,15 @@ private synchronized void handleStoreEvent(RMStateStoreEvent event) {
ApplicationState appState =
((RMStateStoreRemoveAppEvent) event).getAppState();
ApplicationId appId = appState.getAppId();
-
+ Exception removedException = null;
LOG.info("Removing info for app: " + appId);
try {
removeApplicationState(appState);
} catch (Exception e) {
LOG.error("Error removing app: " + appId, e);
+ removedException = e;
+ } finally {
+ notifyDoneRemovingApplcation(appId, removedException);
}
}
break;
@@ -521,7 +525,18 @@ private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId,
rmDispatcher.getEventHandler().handle(
new RMAppAttemptStoredEvent(attemptId, storedException));
}
-
+
+ @SuppressWarnings("unchecked")
+ /**
+ * This is to notify RMApp that this application has been removed from
+ * RMStateStore
+ */
+ private void notifyDoneRemovingApplcation(ApplicationId appId,
+ Exception removedException) {
+ rmDispatcher.getEventHandler().handle(
+ new RMAppRemovedEvent(appId, removedException));
+ }
+
/**
* EventHandler implementation which forward events to the FSRMStateStore
* This hides the EventHandle methods of the store from its public interface
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
index d15e12e..e7dba63 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
@@ -27,11 +27,14 @@
// Source: RMAppAttempt
APP_REJECTED,
APP_ACCEPTED,
- APP_SAVED,
ATTEMPT_REGISTERED,
- ATTEMPT_FINISHING,
+ ATTEMPT_UNREGISTERED,
ATTEMPT_FINISHED, // Will send the final state
ATTEMPT_FAILED,
ATTEMPT_KILLED,
- NODE_UPDATE
+ NODE_UPDATE,
+
+ // Source: RMStateStore
+ APP_SAVED,
+ APP_REMOVED
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index c69aed3..00556c2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -56,6 +56,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -167,8 +168,9 @@
// Transitions from RUNNING state
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
- .addTransition(RMAppState.RUNNING, RMAppState.FINISHING,
- RMAppEventType.ATTEMPT_FINISHING, new RMAppFinishingTransition())
+ .addTransition(RMAppState.RUNNING, RMAppState.REMOVING,
+ RMAppEventType.ATTEMPT_UNREGISTERED,
+ new RMAppRemovingTransition())
.addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
.addTransition(RMAppState.RUNNING,
@@ -178,6 +180,19 @@
.addTransition(RMAppState.RUNNING, RMAppState.KILLED,
RMAppEventType.KILL, new KillAppAndAttemptTransition())
+ // Transitions from REMOVING state
+ .addTransition(RMAppState.REMOVING, RMAppState.FINISHING,
+ RMAppEventType.APP_REMOVED, new RMAppFinishingTransition())
+ .addTransition(RMAppState.REMOVING, RMAppState.FINISHED,
+ RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
+ .addTransition(RMAppState.REMOVING, RMAppState.KILLED,
+ RMAppEventType.KILL, new KillAppAndAttemptTransition())
+ // ignorable transitions
+ .addTransition(RMAppState.REMOVING, RMAppState.REMOVING,
+ RMAppEventType.ATTEMPT_UNREGISTERED)
+ .addTransition(RMAppState.REMOVING, RMAppState.REMOVING,
+ RMAppEventType.NODE_UPDATE)
+
// Transitions from FINISHING state
.addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
@@ -185,7 +200,7 @@
RMAppEventType.KILL, new KillAppAndAttemptTransition())
// ignorable transitions
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
- RMAppEventType.NODE_UPDATE)
+ EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.APP_REMOVED))
// Transitions from FINISHED state
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
@@ -194,12 +209,14 @@
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
EnumSet.of(
RMAppEventType.NODE_UPDATE,
- RMAppEventType.ATTEMPT_FINISHING,
- RMAppEventType.ATTEMPT_FINISHED))
+ RMAppEventType.ATTEMPT_UNREGISTERED,
+ RMAppEventType.ATTEMPT_FINISHED,
+ RMAppEventType.APP_REMOVED))
// Transitions from FAILED state
.addTransition(RMAppState.FAILED, RMAppState.FAILED,
- EnumSet.of(RMAppEventType.KILL, RMAppEventType.APP_SAVED))
+ EnumSet.of(RMAppEventType.KILL, RMAppEventType.APP_SAVED,
+ RMAppEventType.APP_REMOVED))
// ignorable transitions
.addTransition(RMAppState.FAILED, RMAppState.FAILED,
RMAppEventType.NODE_UPDATE)
@@ -211,7 +228,8 @@
EnumSet.of(RMAppEventType.APP_ACCEPTED,
RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
- RMAppEventType.ATTEMPT_KILLED, RMAppEventType.APP_SAVED))
+ RMAppEventType.ATTEMPT_KILLED, RMAppEventType.APP_SAVED,
+ RMAppEventType.APP_REMOVED))
// ignorable transitions
.addTransition(RMAppState.KILLED, RMAppState.KILLED,
RMAppEventType.NODE_UPDATE)
@@ -657,6 +675,14 @@ public void transition(RMAppImpl app, RMAppEvent event) {
}
}
+ private static final class RMAppRemovingTransition extends RMAppTransition {
+ @Override
+ public void transition(RMAppImpl app, RMAppEvent event) {
+ LOG.info("Removing application with id " + app.applicationId);
+ app.rmContext.getStateStore().removeApplication(app);
+ }
+ }
+
private static class AppFinishedTransition extends FinalTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
RMAppFinishedAttemptEvent finishedEvent =
@@ -712,6 +738,11 @@ public void transition(RMAppImpl app, RMAppEvent event) {
if (app.getState() != RMAppState.FINISHING) {
app.finishTime = System.currentTimeMillis();
}
+ // application completely done and remove from state store.
+ // App state may be already removed during RMAppFinishingOrRemovingTransition.
+ RMStateStore store = app.rmContext.getStateStore();
+ store.removeApplication(app);
+
app.handler.handle(
new RMAppManagerEvent(app.applicationId,
RMAppManagerEventType.APP_COMPLETED));
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRemovedEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRemovedEvent.java
new file mode 100644
index 0000000..b780f39
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRemovedEvent.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+public class RMAppRemovedEvent extends RMAppEvent {
+
+ private final Exception removedException;
+
+ public RMAppRemovedEvent(ApplicationId appId, Exception storedException) {
+ super(appId, RMAppEventType.APP_REMOVED);
+ this.removedException = storedException;
+ }
+
+ public Exception getStoredException() {
+ return removedException;
+ }
+
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java
index b7f9325..e9ce5b4 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java
@@ -24,6 +24,7 @@
SUBMITTED,
ACCEPTED,
RUNNING,
+ REMOVING,
FINISHING,
FINISHED,
FAILED,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 00397cf..33a917b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -1122,7 +1122,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
ApplicationId applicationId =
appAttempt.getAppAttemptId().getApplicationId();
appAttempt.eventHandler.handle(
- new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_FINISHING));
+ new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_UNREGISTERED));
return RMAppAttemptState.FINISHING;
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
index d6bd3f6..9477df1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import java.io.IOException;
@@ -53,8 +54,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -67,6 +69,7 @@
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS;
private static int appId = 1;
private DrainDispatcher rmDispatcher;
+ private RMStateStore store;
// ignore all the RM application attempt events
private static final class TestApplicationAttemptEventDispatcher implements
@@ -141,7 +144,7 @@ public void setUp() throws Exception {
mock(ContainerAllocationExpirer.class);
AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class);
- RMStateStore store = mock(RMStateStore.class);
+ store = mock(RMStateStore.class);
this.rmContext =
new RMContextImpl(rmDispatcher, store,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
@@ -247,6 +250,10 @@ private static void assertTimesAtFinish(RMApp application) {
(application.getFinishTime() >= application.getStartTime()));
}
+ private void assertAppRemoved(RMApp application){
+ verify(store).removeApplication(application);
+ }
+
private static void assertKilled(RMApp application) {
assertTimesAtFinish(application);
assertAppState(RMAppState.KILLED, application);
@@ -335,15 +342,27 @@ protected RMApp testCreateAppRunning(
return application;
}
+ protected RMApp testCreateAppRemoving(
+ ApplicationSubmissionContext submissionContext) throws IOException {
+ RMApp application = testCreateAppRunning(submissionContext);
+ RMAppEvent finishingEvent =
+ new RMAppEvent(application.getApplicationId(),
+ RMAppEventType.ATTEMPT_UNREGISTERED);
+ application.handle(finishingEvent);
+ assertAppState(RMAppState.REMOVING, application);
+ assertAppRemoved(application);
+ return application;
+ }
+
protected RMApp testCreateAppFinishing(
ApplicationSubmissionContext submissionContext) throws IOException {
// unmanaged AMs don't use the FINISHING state
assert submissionContext == null || !submissionContext.getUnmanagedAM();
- RMApp application = testCreateAppRunning(submissionContext);
- // RUNNING => FINISHING event RMAppEventType.ATTEMPT_FINISHING
+ RMApp application = testCreateAppRemoving(submissionContext);
+ // REMOVING => FINISHING event RMAppEventType.APP_REMOVED
RMAppEvent finishingEvent =
new RMAppEvent(application.getApplicationId(),
- RMAppEventType.ATTEMPT_FINISHING);
+ RMAppEventType.APP_REMOVED);
application.handle(finishingEvent);
assertAppState(RMAppState.FINISHING, application);
assertTimesAtFinish(application);
@@ -608,6 +627,43 @@ public void testAppRunningFailed() throws IOException {
}
@Test
+ public void testAppRemovingFinishing() throws IOException {
+ LOG.info("--- START: testAppRemovingFinishing ---");
+ RMApp application = testCreateAppRemoving(null);
+ // APP_REMOVING => FINISHING event RMAppEventType.APP_REMOVED
+ RMAppEvent event =
+ new RMAppEvent(application.getApplicationId(),
+ RMAppEventType.APP_REMOVED);
+ application.handle(event);
+ rmDispatcher.await();
+ assertAppState(RMAppState.FINISHING, application);
+ }
+
+ @Test
+ public void testAppRemovingFinished() throws IOException {
+ LOG.info("--- START: testAppRemovingFINISHED ---");
+ RMApp application = testCreateAppRemoving(null);
+ // APP_REMOVING => FINISHED event RMAppEventType.ATTEMPT_FINISHED
+ RMAppEvent finishedEvent = new RMAppFinishedAttemptEvent(
+ application.getApplicationId(), null);
+ application.handle(finishedEvent);
+ rmDispatcher.await();
+ assertAppState(RMAppState.FINISHED, application);
+ }
+
+ @Test
+ public void testAppRemovingKilledD() throws IOException {
+ LOG.info("--- START: testAppRemovingKilledD ---");
+ RMApp application = testCreateAppRemoving(null);
+ // APP_REMOVING => KILLED event RMAppEventType.KILL
+ RMAppEvent event =
+ new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
+ application.handle(event);
+ rmDispatcher.await();
+ assertAppState(RMAppState.KILLED, application);
+ }
+
+ @Test
public void testAppFinishingKill() throws IOException {
LOG.info("--- START: testAppFinishedFinished ---");