diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/YarnApplicationState.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/YarnApplicationState.java index 7b809da..6d6db90 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/YarnApplicationState.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/YarnApplicationState.java @@ -30,6 +30,9 @@ /** Application which was just created. */ NEW, + /** Application which is being saved. */ + SAVING, + /** Application which has been submitted. */ SUBMITTED, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 83aac74..8d7a9a3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -72,12 +72,13 @@ message ContainerProto { enum YarnApplicationStateProto { NEW = 1; - SUBMITTED = 2; - RUNNING = 3; - FINISHED = 4; - FAILED = 5; - KILLED = 6; - ACCEPTED = 7; + SAVING = 2; + SUBMITTED = 3; + RUNNING = 4; + FINISHED = 5; + FAILED = 6; + KILLED = 7; + ACCEPTED = 8; } enum FinalApplicationStatusProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index ba462e5..6a19d41 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -297,20 +297,6 @@ public SubmitApplicationResponse submitApplication( // So call handle directly and do not send an event. rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System .currentTimeMillis())); - - // If recovery is enabled then store the application information in a - // blocking call so make sure that RM has stored the information needed - // to restart the AM after RM restart without further client communication - RMStateStore stateStore = rmContext.getStateStore(); - LOG.info("Storing Application with id " + applicationId); - try { - stateStore.storeApplication(rmContext.getRMApps().get(applicationId)); - } catch (Exception e) { - // For HA this exception needs to be handled by giving up - // master status if we got fenced - LOG.error("Failed to store application:" + applicationId, e); - ExitUtil.terminate(1, e); - } LOG.info("Application with id " + applicationId.getId() + " submitted by user " + user); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 674a779..e492257 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -37,9 +37,12 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent; +import com.google.common.annotations.VisibleForTesting; + @Private @Unstable /** @@ -125,7 +128,7 @@ public void setDispatcher(Dispatcher dispatcher) { this.rmDispatcher = dispatcher; } - AsyncDispatcher dispatcher; + protected AsyncDispatcher dispatcher; public synchronized void init(Configuration conf) throws Exception{ // create async handler @@ -166,21 +169,19 @@ public synchronized void close() throws Exception { public abstract RMState loadState() throws Exception; /** - * Blocking API + * Non-Blocking API * ResourceManager services use this to store the application's state - * This must not be called on the dispatcher thread + * This does not block the dispatcher threads + * RMAppStoredEvent will be sent on completion to notify the RMApp */ - public synchronized void storeApplication(RMApp app) throws Exception { + @SuppressWarnings("unchecked") + public synchronized void storeApplication(RMApp app) { ApplicationSubmissionContext context = app .getApplicationSubmissionContext(); assert context instanceof ApplicationSubmissionContextPBImpl; - - ApplicationStateDataPBImpl appStateData = new ApplicationStateDataPBImpl(); - appStateData.setSubmitTime(app.getSubmitTime()); - appStateData.setApplicationSubmissionContext(context); - - LOG.info("Storing info for app: " + context.getApplicationId()); - storeApplicationState(app.getApplicationId().toString(), appStateData); + ApplicationState appState = new ApplicationState( + app.getSubmitTime(), context); + dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState)); } /** @@ -255,6 +256,30 @@ protected abstract void removeApplicationState(ApplicationState appState) private synchronized void handleStoreEvent(RMStateStoreEvent event) { switch(event.getType()) { + case STORE_APP: + { + ApplicationState apptState = + ((RMStateStoreAppEvent) event).getAppState(); + Exception storedException = null; + ApplicationStateDataPBImpl appStateData = + new ApplicationStateDataPBImpl(); + appStateData.setSubmitTime(apptState.getSubmitTime()); + appStateData.setApplicationSubmissionContext( + apptState.getApplicationSubmissionContext()); + ApplicationId appId = + apptState.getApplicationSubmissionContext().getApplicationId(); + + LOG.info("Storing info for app: " + appId); + try { + storeApplicationState(appId.toString(), appStateData); + } catch (Exception e) { + LOG.error("Error storing app: " + appId, e); + storedException = e; + } finally { + notifyDoneStoringApplication(appId, storedException); + } + } + break; case STORE_APP_ATTEMPT: { ApplicationAttemptState attemptState = @@ -297,6 +322,20 @@ private synchronized void handleStoreEvent(RMStateStoreEvent event) { LOG.error("Unknown RMStateStoreEvent type: " + event.getType()); } } + + @SuppressWarnings("unchecked") + /** + * In (@link storeApplication}, derived class can call this method to notify + * the application about operation completion + * @param appId id of the application that has been saved + * @param storedException the exception that is thrown when storing the + * application + */ + private void notifyDoneStoringApplication(ApplicationId appId, + Exception storedException) { + RMAppStoredEvent event = new RMAppStoredEvent(appId, storedException); + rmDispatcher.getEventHandler().handle(event); + } @SuppressWarnings("unchecked") /** @@ -314,7 +353,8 @@ private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId, * EventHandler implementation which forward events to the FSRMStateStore * This hides the EventHandle methods of the store from its public interface */ - private final class ForwardingEventHandler + @VisibleForTesting + public final class ForwardingEventHandler implements EventHandler { @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java new file mode 100644 index 0000000..99f8e37 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; + +public class RMStateStoreAppEvent extends RMStateStoreEvent { + + private final ApplicationState appState; + + public RMStateStoreAppEvent(ApplicationState appState) { + super(RMStateStoreEventType.STORE_APP); + this.appState = appState; + } + + public ApplicationState getAppState() { + return appState; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java index 22f155c..f5383fa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java @@ -20,5 +20,6 @@ public enum RMStateStoreEventType { STORE_APP_ATTEMPT, + STORE_APP, REMOVE_APP } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java index 20eef1d..678f24f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java @@ -26,6 +26,7 @@ // Source: RMAppAttempt APP_REJECTED, APP_ACCEPTED, + APP_SAVED, ATTEMPT_REGISTERED, ATTEMPT_FINISHING, ATTEMPT_FINISHED, // Will send the final state diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 62a3ba7..85ed0bd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -118,13 +119,23 @@ // Transitions from NEW state .addTransition(RMAppState.NEW, RMAppState.NEW, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) - .addTransition(RMAppState.NEW, RMAppState.SUBMITTED, + .addTransition(RMAppState.NEW, RMAppState.SAVING, RMAppEventType.START, new StartAppAttemptTransition()) .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL, new AppKilledTransition()) .addTransition(RMAppState.NEW, RMAppState.FAILED, RMAppEventType.APP_REJECTED, new AppRejectedTransition()) + // Transitions from SAVING state + .addTransition(RMAppState.SAVING, RMAppState.SAVING, + RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) + .addTransition(RMAppState.SAVING, RMAppState.SUBMITTED, + RMAppEventType.APP_SAVED, new RMAppSavedTransition()) + .addTransition(RMAppState.SAVING, RMAppState.KILLED, + RMAppEventType.KILL, new KillAppAndAttemptTransition()) + .addTransition(RMAppState.SAVING, RMAppState.FAILED, + RMAppEventType.APP_REJECTED, new AppRejectedTransition()) + // Transitions from SUBMITTED state .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) @@ -358,6 +369,8 @@ private YarnApplicationState createApplicationState(RMAppState rmAppState) { switch(rmAppState) { case NEW: return YarnApplicationState.NEW; + case SAVING: + return YarnApplicationState.SAVING; case SUBMITTED: return YarnApplicationState.SUBMITTED; case ACCEPTED: @@ -378,6 +391,7 @@ private YarnApplicationState createApplicationState(RMAppState rmAppState) { private FinalApplicationStatus createFinalApplicationStatus(RMAppState state) { switch(state) { case NEW: + case SAVING: case SUBMITTED: case ACCEPTED: case RUNNING: @@ -574,6 +588,18 @@ private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) { + " with state:" + nodeState); } + private void checkAppStoreError(RMAppEvent event) { + RMAppStoredEvent storeEvent = (RMAppStoredEvent) event; + if(storeEvent.getStoredException() != null) + { + // For HA this exception needs to be handled by giving up + // master status if we got fenced + LOG.error("Failed to store application:" + getApplicationId(), + storeEvent.getStoredException()); + ExitUtil.terminate(1, storeEvent.getStoredException()); + } + } + private static class RMAppTransition implements SingleArcTransition { public void transition(RMAppImpl app, RMAppEvent event) { @@ -592,6 +618,14 @@ public void transition(RMAppImpl app, RMAppEvent event) { private static final class StartAppAttemptTransition extends RMAppTransition { public void transition(RMAppImpl app, RMAppEvent event) { app.createNewAttempt(true); + + // If recovery is enabled then store the application information in a + // non-blocking call so make sure that RM has stored the information + // needed to restart the AM after RM restart without further client + // communication + LOG.info("Storing application with id " + app.applicationId); + app.rmContext.getStateStore().storeApplication( + app.rmContext.getRMApps().get(app.applicationId)); }; } @@ -603,6 +637,13 @@ public void transition(RMAppImpl app, RMAppEvent event) { } } + private static final class RMAppSavedTransition extends RMAppTransition { + @Override + public void transition(RMAppImpl app, RMAppEvent event) { + app.checkAppStoreError(event); + } + } + private static class AppFinishedTransition extends FinalTransition { public void transition(RMAppImpl app, RMAppEvent event) { RMAppFinishedAttemptEvent finishedEvent = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java index a9e3ce1..fc4cf55 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java @@ -19,5 +19,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; public enum RMAppState { - NEW, SUBMITTED, ACCEPTED, RUNNING, FINISHING, FINISHED, FAILED, KILLED + NEW, + SAVING, + SUBMITTED, + ACCEPTED, + RUNNING, + FINISHING, + FINISHED, + FAILED, + KILLED } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppStoredEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppStoredEvent.java new file mode 100644 index 0000000..76fb1df --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppStoredEvent.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import org.apache.hadoop.yarn.api.records.ApplicationId; + +public class RMAppStoredEvent extends RMAppEvent { + + private final Exception storedException; + + public RMAppStoredEvent(ApplicationId appId, Exception storedException) { + super(appId, RMAppEventType.APP_SAVED); + this.storedException = storedException; + } + + public Exception getStoredException() { + return storedException; + } + +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java index a4826e8..69ffe6a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java @@ -63,6 +63,7 @@ public void scheduler() { // limit applications to those in states relevant to scheduling set(YarnWebParams.APP_STATE, StringHelper.cjoin( RMAppState.NEW.toString(), + RMAppState.SAVING.toString(), RMAppState.SUBMITTED.toString(), RMAppState.ACCEPTED.toString(), RMAppState.RUNNING.toString(), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java index 8a38278..ec67483 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java @@ -83,7 +83,9 @@ public AppInfo(RMApp app, Boolean hasAccess) { String trackingUrl = app.getTrackingUrl(); this.state = app.getState(); this.trackingUrlIsNotReady = trackingUrl == null || trackingUrl.isEmpty() - || RMAppState.NEW == this.state || RMAppState.SUBMITTED == this.state + || RMAppState.NEW == this.state + || RMAppState.SAVING == this.state + || RMAppState.SUBMITTED == this.state || RMAppState.ACCEPTED == this.state; this.trackingUI = this.trackingUrlIsNotReady ? "UNASSIGNED" : (app .getFinishTime() == 0 ? "ApplicationMaster" : "History"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 8e8e485..062ccc9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -21,6 +21,8 @@ import static org.mockito.Mockito.mock; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import junit.framework.Assert; @@ -32,15 +34,18 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; -import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; @@ -64,6 +69,7 @@ private static int maxAppAttempts = 4; private static int appId = 1; private DrainDispatcher rmDispatcher; + private TestRMStateStore rmStateStore; // ignore all the RM application attempt events private static final class TestApplicationAttemptEventDispatcher implements @@ -113,22 +119,54 @@ public void handle(RMAppEvent event) { } } - // handle all the RM application manager events - same as in - // ResourceManager.java - private static final class TestApplicationManagerEventDispatcher implements - EventHandler { - @Override - public void handle(RMAppManagerEvent event) { - } - } - // handle all the scheduler events - same as in ResourceManager.java private static final class TestSchedulerEventDispatcher implements EventHandler { @Override public void handle(SchedulerEvent event) { } - } + } + + // Test whether application store is succeeded or not + private static final class TestRMStateStore extends NullRMStateStore { + + private Map store + = new HashMap(); + + public synchronized void init(Configuration conf) throws Exception{ + // create async handler + dispatcher = new DrainDispatcher(); + dispatcher.init(conf); + dispatcher.register(RMStateStoreEventType.class, + new ForwardingEventHandler()); + dispatcher.start(); + + initInternal(conf); + } + + @Override + protected void storeApplicationState(String appId, + ApplicationStateDataPBImpl appStateData) throws Exception { + store.put(appId, appStateData); + super.storeApplicationState(appId, appStateData); + } + + @Override + protected void removeApplicationState(ApplicationState appState) + throws Exception { + store.remove(appState.getAppId().toString()); + super.removeApplicationState(appState); + } + + public ApplicationStateDataPBImpl getApplicationStateDataPBImpl( + String appId) { + return store.get(appId); + } + + public void awaitDispatcher() { + ((DrainDispatcher) dispatcher).await(); + } + } @Before public void setUp() throws Exception { @@ -138,8 +176,11 @@ public void setUp() throws Exception { mock(ContainerAllocationExpirer.class); AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class); AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class); + rmStateStore = new TestRMStateStore(); + rmStateStore.init(conf); + rmStateStore.setDispatcher(rmDispatcher); this.rmContext = - new RMContextImpl(rmDispatcher, + new RMContextImpl(rmDispatcher, rmStateStore, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, null, new ApplicationTokenSecretManager(conf), new RMContainerTokenSecretManager(conf), @@ -152,11 +193,14 @@ public void setUp() throws Exception { new TestApplicationEventDispatcher(rmContext)); rmDispatcher.register(RMAppManagerEventType.class, - new TestApplicationManagerEventDispatcher()); + new RMAppManager(rmContext, null, null, null, conf)); rmDispatcher.register(SchedulerEventType.class, new TestSchedulerEventDispatcher()); - + + rmDispatcher.register(RMStateStoreEventType.class, + rmStateStore.new ForwardingEventHandler()); + rmDispatcher.init(conf); rmDispatcher.start(); } @@ -176,16 +220,25 @@ protected RMApp createNewTestApp(ApplicationSubmissionContext submissionContext) if(submissionContext == null) { submissionContext = new ApplicationSubmissionContextPBImpl(); } + submissionContext.setApplicationId(applicationId); RMApp application = new RMAppImpl(applicationId, rmContext, conf, name, user, queue, submissionContext, scheduler, masterService, System.currentTimeMillis()); + Assert.assertNull("application already exists", + rmContext.getRMApps().putIfAbsent(applicationId, application)); testAppStartState(applicationId, user, name, queue, application); return application; } + private void assertApplicationRemoved(RMApp application) { + rmStateStore.awaitDispatcher(); + Assert.assertNull(rmStateStore.getApplicationStateDataPBImpl( + application.getApplicationId().toString())); + } + // Test expected newly created app state private static void testAppStartState(ApplicationId applicationId, String user, String name, String queue, RMApp application) { @@ -264,21 +317,30 @@ private static void assertFailed(RMApp application, String regex) { diag.toString().matches(regex)); } - protected RMApp testCreateAppSubmitted( + protected RMApp testCreateAppSavingSubmitted( ApplicationSubmissionContext submissionContext) throws IOException { - RMApp application = createNewTestApp(submissionContext); - // NEW => SUBMITTED event RMAppEventType.START + RMApp application = createNewTestApp(submissionContext); + // NEW => SAVING => SUBMITTED event RMAppEventType.START + // There is a transit state SAVING, but it's difficult to trace it because + // two async dispatchers handle the events in the unpredictable order. RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.START); + new RMAppEvent(application.getApplicationId(), + RMAppEventType.START); application.handle(event); + rmDispatcher.await(); + rmStateStore.awaitDispatcher(); + rmDispatcher.await(); assertStartTimeSet(application); + // make sure SAVING is gone through and an application has been stored assertAppState(RMAppState.SUBMITTED, application); + Assert.assertNotNull(rmStateStore.getApplicationStateDataPBImpl( + application.getApplicationId().toString())); return application; } protected RMApp testCreateAppAccepted( ApplicationSubmissionContext submissionContext) throws IOException { - RMApp application = testCreateAppSubmitted(submissionContext); + RMApp application = testCreateAppSavingSubmitted(submissionContext); // SUBMITTED => ACCEPTED event RMAppEventType.APP_ACCEPTED RMAppEvent event = new RMAppEvent(application.getApplicationId(), @@ -332,16 +394,18 @@ protected RMApp testCreateAppFinished( RMAppEvent finishedEvent = new RMAppFinishedAttemptEvent( application.getApplicationId(), diagnostics); application.handle(finishedEvent); + rmDispatcher.await(); assertAppState(RMAppState.FINISHED, application); assertTimesAtFinish(application); // finished without a proper unregister implies failed assertFinalAppStatus(FinalApplicationStatus.FAILED, application); Assert.assertTrue("Finished app missing diagnostics", application.getDiagnostics().indexOf(diagnostics) != -1); + assertApplicationRemoved(application); return application; } - @Test + @Test (timeout = 30000) public void testUnmanagedApp() throws IOException { ApplicationSubmissionContext subContext = new ApplicationSubmissionContextPBImpl(); subContext.setUnmanagedAM(true); @@ -366,7 +430,7 @@ public void testUnmanagedApp() throws IOException { ".*Unmanaged application.*Failing the application.*"); } - @Test + @Test (timeout = 30000) public void testAppSuccessPath() throws IOException { LOG.info("--- START: testAppSuccessPath ---"); final String diagMsg = "some diagnostics"; @@ -375,7 +439,7 @@ public void testAppSuccessPath() throws IOException { application.getDiagnostics().indexOf(diagMsg) != -1); } - @Test + @Test (timeout = 30000) public void testAppNewKill() throws IOException { LOG.info("--- START: testAppNewKill ---"); @@ -388,7 +452,7 @@ public void testAppNewKill() throws IOException { assertKilled(application); } - @Test + @Test (timeout = 30000) public void testAppNewReject() throws IOException { LOG.info("--- START: testAppNewReject ---"); @@ -402,11 +466,11 @@ public void testAppNewReject() throws IOException { assertFailed(application, rejectedText); } - @Test + @Test (timeout = 30000) public void testAppSubmittedRejected() throws IOException { LOG.info("--- START: testAppSubmittedRejected ---"); - RMApp application = testCreateAppSubmitted(null); + RMApp application = testCreateAppSavingSubmitted(null); // SUBMITTED => FAILED event RMAppEventType.APP_REJECTED String rejectedText = "app rejected"; RMAppEvent event = @@ -414,12 +478,13 @@ public void testAppSubmittedRejected() throws IOException { application.handle(event); rmDispatcher.await(); assertFailed(application, rejectedText); + assertApplicationRemoved(application); } - @Test + @Test (timeout = 30000) public void testAppSubmittedKill() throws IOException, InterruptedException { LOG.info("--- START: testAppSubmittedKill---"); - RMApp application = testCreateAppSubmitted(null); + RMApp application = testCreateAppSavingSubmitted(null); // SUBMITTED => KILLED event RMAppEventType.KILL RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); @@ -429,9 +494,10 @@ public void testAppSubmittedKill() throws IOException, InterruptedException { rmDispatcher.await(); assertKilled(application); assertAppAndAttemptKilled(application); + assertApplicationRemoved(application); } - @Test + @Test (timeout = 30000) public void testAppAcceptedFailed() throws IOException { LOG.info("--- START: testAppAcceptedFailed ---"); @@ -460,9 +526,10 @@ public void testAppAcceptedFailed() throws IOException { application.handle(event); rmDispatcher.await(); assertFailed(application, ".*" + message + ".*Failing the application.*"); + assertApplicationRemoved(application); } - @Test + @Test (timeout = 30000) public void testAppAcceptedKill() throws IOException, InterruptedException { LOG.info("--- START: testAppAcceptedKill ---"); RMApp application = testCreateAppAccepted(null); @@ -475,9 +542,10 @@ public void testAppAcceptedKill() throws IOException, InterruptedException { rmDispatcher.await(); assertKilled(application); assertAppAndAttemptKilled(application); + assertApplicationRemoved(application); } - @Test + @Test (timeout = 30000) public void testAppRunningKill() throws IOException { LOG.info("--- START: testAppRunningKill ---"); @@ -488,9 +556,10 @@ public void testAppRunningKill() throws IOException { application.handle(event); rmDispatcher.await(); assertKilled(application); + assertApplicationRemoved(application); } - @Test + @Test (timeout = 30000) public void testAppRunningFailed() throws IOException { LOG.info("--- START: testAppRunningFailed ---"); @@ -532,6 +601,7 @@ public void testAppRunningFailed() throws IOException { application.handle(event); rmDispatcher.await(); assertFailed(application, ".*Failing the application.*"); + assertApplicationRemoved(application); // FAILED => FAILED event RMAppEventType.KILL event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); @@ -540,7 +610,7 @@ public void testAppRunningFailed() throws IOException { assertFailed(application, ".*Failing the application.*"); } - @Test + @Test (timeout = 30000) public void testAppFinishingKill() throws IOException { LOG.info("--- START: testAppFinishedFinished ---"); @@ -551,9 +621,10 @@ public void testAppFinishingKill() throws IOException { application.handle(event); rmDispatcher.await(); assertAppState(RMAppState.FINISHED, application); + assertApplicationRemoved(application); } - @Test + @Test (timeout = 30000) public void testAppFinishedFinished() throws IOException { LOG.info("--- START: testAppFinishedFinished ---"); @@ -570,7 +641,7 @@ public void testAppFinishedFinished() throws IOException { "", diag.toString()); } - @Test + @Test (timeout = 30000) public void testAppKilledKilled() throws IOException { LOG.info("--- START: testAppKilledKilled ---"); @@ -583,6 +654,7 @@ public void testAppKilledKilled() throws IOException { rmDispatcher.await(); assertTimesAtFinish(application); assertAppState(RMAppState.KILLED, application); + assertApplicationRemoved(application); // KILLED => KILLED event RMAppEventType.ATTEMPT_FINISHED event = new RMAppFinishedAttemptEvent( @@ -618,7 +690,7 @@ public void testAppKilledKilled() throws IOException { assertAppState(RMAppState.KILLED, application); } - @Test + @Test (timeout = 30000) public void testGetAppReport() { RMApp app = createNewTestApp(null); assertAppState(RMAppState.NEW, app);