diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index ad1c9f1..9c2bbcb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -45,6 +45,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -57,6 +58,8 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import com.sun.research.ws.wadl.Response; + /** * Registers/unregisters to RM and sends heartbeats to RM. */ @@ -194,7 +197,15 @@ protected void unregister() { FinishApplicationMasterRequest request = FinishApplicationMasterRequest.newInstance(finishState, sb.toString(), historyUrl); - scheduler.finishApplicationMaster(request); + while (true) { + FinishApplicationMasterResponse response = + scheduler.finishApplicationMaster(request); + if (response.getIsUnregistered()) { + break; + } + LOG.info("Waiting for application to be successfully unregistered."); + Thread.sleep(rmPollInterval); + } } catch(Exception are) { LOG.error("Exception while unregistering ", are); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationMasterResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationMasterResponse.java index 4317b67..cb08ef3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationMasterResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationMasterResponse.java @@ -26,21 +26,55 @@ import org.apache.hadoop.yarn.util.Records; /** - *

The response sent by the ResourceManager to a - * ApplicationMaster on it's completion.

+ *

+ * The response sent by the ResourceManager to a + * ApplicationMaster on it's completion. + *

* - *

Currently, this is empty.

+ *

+ * The response, includes: + *

+ *

+ * Note: This flag is only needed for RM recovery purpose. If RM recovery is + * enabled, user is expected to retry until this flag becomes true. + * Otherwise,user will risk restarting an already finished application after RM + * restarts. * * @see ApplicationMasterProtocol#finishApplicationMaster(FinishApplicationMasterRequest) */ @Public @Stable public abstract class FinishApplicationMasterResponse { + @Private @Unstable - public static FinishApplicationMasterResponse newInstance() { + public static FinishApplicationMasterResponse newInstance( + boolean isRemovedFromRMStateStore) { FinishApplicationMasterResponse response = Records.newRecord(FinishApplicationMasterResponse.class); + response.setIsUnregistered(isRemovedFromRMStateStore); return response; } + + /** + * Get the flag which indicates that the application has successfully + * unregistered with RM and the application state has been removed from + * RMStateStore. + */ + @Public + @Stable + public abstract boolean getIsUnregistered(); + + /** + * Set the flag which indicates that the application has successfully + * unregistered with RM and the application state has been removed from + * RMStateStore. + */ + @Private + @Unstable + public abstract void setIsUnregistered(boolean isUnregistered); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 7b3d0cf..36e1d45 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -52,6 +52,7 @@ message FinishApplicationMasterRequestProto { } message FinishApplicationMasterResponseProto { + optional bool isUnregistered = 1 [default = false]; } message AllocateRequestProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index c433b55..beee423 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -300,11 +301,24 @@ public void unregisterApplicationMaster(FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) throws YarnException, IOException { Preconditions.checkArgument(appStatus != null, - "AppStatus should not be null."); + "AppStatus should not be null."); FinishApplicationMasterRequest request = FinishApplicationMasterRequest.newInstance(appStatus, appMessage, appTrackingUrl); - rmClient.finishApplicationMaster(request); + try { + while (true) { + FinishApplicationMasterResponse response = + rmClient.finishApplicationMaster(request); + if (response.getIsUnregistered()) { + break; + } + LOG.info("Waiting for application to be successfully unregistered."); + Thread.sleep(100); + } + } catch (InterruptedException e) { + LOG.info("Interrupted while waiting for application" + + " to be removed from RMStateStore"); + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationMasterResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationMasterResponsePBImpl.java index ff57eb4..1bad374 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationMasterResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationMasterResponsePBImpl.java @@ -22,7 +22,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterResponseProtoOrBuilder; import com.google.protobuf.TextFormat; @@ -67,4 +69,24 @@ public boolean equals(Object other) { public String toString() { return TextFormat.shortDebugString(getProto()); } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = FinishApplicationMasterResponseProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public boolean getIsUnregistered() { + FinishApplicationMasterResponseProtoOrBuilder p = + viaProto ? proto : builder; + return p.getIsUnregistered(); + } + + @Override + public void setIsUnregistered(boolean isUnregistered) { + maybeInitBuilder(); + builder.setIsUnregistered(isUnregistered); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index a41792d..fd39dad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; @@ -303,9 +304,12 @@ public FinishApplicationMasterResponse finishApplicationMaster( .getTrackingUrl(), request.getFinalApplicationStatus(), request .getDiagnostics())); - FinishApplicationMasterResponse response = recordFactory - .newRecordInstance(FinishApplicationMasterResponse.class); - return response; + if (rmContext.getRMApps().get(applicationAttemptId.getApplicationId()) + .isAppSafeToUnregister()) { + return FinishApplicationMasterResponse.newInstance(true); + } else { + return FinishApplicationMasterResponse.newInstance(false); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 11248ba..36104a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -415,7 +415,7 @@ public GetApplicationsResponse getApplications( if (applicationStates != null && !applicationStates.isEmpty()) { if (!applicationStates.contains(RMServerUtils - .createApplicationState(application.getState()))) { + .createApplicationState(application))) { continue; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 6439df1..0c38f50 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -186,10 +186,6 @@ protected synchronized void finishApplication(ApplicationId applicationId) { completedApps.add(applicationId); writeAuditLog(applicationId); - - // application completely done. Remove from state - RMStateStore store = rmContext.getStateStore(); - store.removeApplication(rmContext.getRMApps().get(applicationId)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index 15d3062..26c3ccf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; @@ -116,7 +117,12 @@ public static void validateBlacklistRequest(ResourceBlacklistRequest blacklistRe } } - public static YarnApplicationState createApplicationState(RMAppState rmAppState) { + public static YarnApplicationState createApplicationState(RMApp rmApp) { + RMAppState rmAppState = rmApp.getState(); + // If App is in REMOVING state, return its previous state. + if (rmAppState.equals(RMAppState.REMOVING)) { + rmAppState = rmApp.getPreviousStateAtRemoving(); + } switch(rmAppState) { case NEW: return YarnApplicationState.NEW; @@ -135,7 +141,8 @@ public static YarnApplicationState createApplicationState(RMAppState rmAppState) return YarnApplicationState.KILLED; case FAILED: return YarnApplicationState.FAILED; + default: + throw new YarnRuntimeException("Unknown state passed!"); } - throw new YarnRuntimeException("Unknown state passed!"); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index bdf4da3..0852ce8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -30,6 +30,7 @@ import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; @@ -108,7 +109,9 @@ public synchronized void storeApplicationAttemptState(String attemptIdStr, ApplicationState appState = state.getApplicationState().get( attemptState.getAttemptId().getApplicationId()); - assert appState != null; + if (appState == null) { + throw new YarnRuntimeException("Application doesn't exist"); + } if (appState.attempts.containsKey(attemptState.getAttemptId())) { Exception e = new IOException("Attempt: " + @@ -125,7 +128,9 @@ public synchronized void removeApplicationState(ApplicationState appState) throws Exception { ApplicationId appId = appState.getAppId(); ApplicationState removed = state.appState.remove(appId); - assert removed != null; + if (removed == null) { + throw new YarnRuntimeException("Removing non-exsisting application state"); + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 179b721..382ed97 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRemovedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent; @@ -482,12 +483,15 @@ private synchronized void handleStoreEvent(RMStateStoreEvent event) { ApplicationState appState = ((RMStateStoreRemoveAppEvent) event).getAppState(); ApplicationId appId = appState.getAppId(); - + Exception removedException = null; LOG.info("Removing info for app: " + appId); try { removeApplicationState(appState); } catch (Exception e) { LOG.error("Error removing app: " + appId, e); + removedException = e; + } finally { + notifyDoneRemovingApplcation(appId, removedException); } } break; @@ -521,7 +525,18 @@ private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId, rmDispatcher.getEventHandler().handle( new RMAppAttemptStoredEvent(attemptId, storedException)); } - + + @SuppressWarnings("unchecked") + /** + * This is to notify RMApp that this application has been removed from + * RMStateStore + */ + private void notifyDoneRemovingApplcation(ApplicationId appId, + Exception removedException) { + rmDispatcher.getEventHandler().handle( + new RMAppRemovedEvent(appId, removedException)); + } + /** * EventHandler implementation which forward events to the FSRMStateStore * This hides the EventHandle methods of the store from its public interface diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index f1c496a..988b952 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -194,4 +194,19 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, * @return the application type. */ String getApplicationType(); + + /** + * Check whether this application is safe to unregister. + * An application is deemed to be safe to unregister if it is an unmanaged + * AM or its state has been removed from state store. + * @return the flag which indicates whether this application is safe to + * unregister. + */ + boolean isAppSafeToUnregister(); + + /** + * Get the previous state when this application is in REMOVING state + * @return the previous state when this application is in REMOVING state + */ + RMAppState getPreviousStateAtRemoving(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java index d15e12e..e7dba63 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java @@ -27,11 +27,14 @@ // Source: RMAppAttempt APP_REJECTED, APP_ACCEPTED, - APP_SAVED, ATTEMPT_REGISTERED, - ATTEMPT_FINISHING, + ATTEMPT_UNREGISTERED, ATTEMPT_FINISHED, // Will send the final state ATTEMPT_FAILED, ATTEMPT_KILLED, - NODE_UPDATE + NODE_UPDATE, + + // Source: RMStateStore + APP_SAVED, + APP_REMOVED } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index cbafffe..45d2deb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -109,6 +110,8 @@ private static final FinalTransition FINAL_TRANSITION = new FinalTransition(); private static final AppFinishedTransition FINISHED_TRANSITION = new AppFinishedTransition(); + private boolean isAppRemovalRequestSent = false; + private RMAppState previousStateAtRemoving; private static final StateMachineFactory updatedNodes) { @Override public String getApplicationType() { return YarnConfiguration.DEFAULT_APPLICATION_TYPE; + } + + @Override + public boolean isAppSafeToUnregister() { + return true; + } + + @Override + public RMAppState getPreviousStateAtRemoving() { + return null; }; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 2c19597..1d2c812 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.mock; import static org.junit.Assume.assumeTrue; +import static org.mockito.Mockito.verify; import java.io.IOException; import java.util.Arrays; @@ -59,8 +60,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -78,6 +80,7 @@ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS; private static int appId = 1; private DrainDispatcher rmDispatcher; + private RMStateStore store; // ignore all the RM application attempt events private static final class TestApplicationAttemptEventDispatcher implements @@ -171,7 +174,7 @@ public void setUp() throws Exception { mock(ContainerAllocationExpirer.class); AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class); AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class); - RMStateStore store = mock(RMStateStore.class); + store = mock(RMStateStore.class); this.rmContext = new RMContextImpl(rmDispatcher, store, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, @@ -278,6 +281,10 @@ private static void assertTimesAtFinish(RMApp application) { (application.getFinishTime() >= application.getStartTime())); } + private void assertAppRemoved(RMApp application){ + verify(store).removeApplication(application); + } + private static void assertKilled(RMApp application) { assertTimesAtFinish(application); assertAppState(RMAppState.KILLED, application); @@ -366,15 +373,27 @@ protected RMApp testCreateAppRunning( return application; } + protected RMApp testCreateAppRemoving( + ApplicationSubmissionContext submissionContext) throws IOException { + RMApp application = testCreateAppRunning(submissionContext); + RMAppEvent finishingEvent = + new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_UNREGISTERED); + application.handle(finishingEvent); + assertAppState(RMAppState.REMOVING, application); + assertAppRemoved(application); + return application; + } + protected RMApp testCreateAppFinishing( ApplicationSubmissionContext submissionContext) throws IOException { // unmanaged AMs don't use the FINISHING state assert submissionContext == null || !submissionContext.getUnmanagedAM(); - RMApp application = testCreateAppRunning(submissionContext); - // RUNNING => FINISHING event RMAppEventType.ATTEMPT_FINISHING + RMApp application = testCreateAppRemoving(submissionContext); + // REMOVING => FINISHING event RMAppEventType.APP_REMOVED RMAppEvent finishingEvent = new RMAppEvent(application.getApplicationId(), - RMAppEventType.ATTEMPT_FINISHING); + RMAppEventType.APP_REMOVED); application.handle(finishingEvent); assertAppState(RMAppState.FINISHING, application); assertTimesAtFinish(application); @@ -635,6 +654,43 @@ public void testAppRunningFailed() throws IOException { } @Test + public void testAppRemovingFinishing() throws IOException { + LOG.info("--- START: testAppRemovingFinishing ---"); + RMApp application = testCreateAppRemoving(null); + // APP_REMOVING => FINISHING event RMAppEventType.APP_REMOVED + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), + RMAppEventType.APP_REMOVED); + application.handle(event); + rmDispatcher.await(); + assertAppState(RMAppState.FINISHING, application); + } + + @Test + public void testAppRemovingFinished() throws IOException { + LOG.info("--- START: testAppRemovingFINISHED ---"); + RMApp application = testCreateAppRemoving(null); + // APP_REMOVING => FINISHED event RMAppEventType.ATTEMPT_FINISHED + RMAppEvent finishedEvent = new RMAppFinishedAttemptEvent( + application.getApplicationId(), null); + application.handle(finishedEvent); + rmDispatcher.await(); + assertAppState(RMAppState.FINISHED, application); + } + + @Test + public void testAppRemovingKilledD() throws IOException { + LOG.info("--- START: testAppRemovingKilledD ---"); + RMApp application = testCreateAppRemoving(null); + // APP_REMOVING => KILLED event RMAppEventType.KILL + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + application.handle(event); + rmDispatcher.await(); + assertAppState(RMAppState.KILLED, application); + } + + @Test public void testAppFinishingKill() throws IOException { LOG.info("--- START: testAppFinishedFinished ---");