diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 6e1b925..232014f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; +import java.net.ConnectException; import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.Map; @@ -306,35 +307,33 @@ protected void submitApplication( appState.getApplicationSubmissionContext(); ApplicationId appId = appState.getAppId(); - // create and recover app. - RMAppImpl application = - createAndPopulateNewRMApp(appContext, appState.getSubmitTime(), - appState.getUser()); - application.recover(rmState); - if (isApplicationInFinalState(appState.getState())) { - // We are synchronously moving the application into final state so that - // momentarily client will not see this application in NEW state. Also - // for finished applications we will avoid renewing tokens. - application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER)); - return; - } - - if (UserGroupInformation.isSecurityEnabled()) { - Credentials credentials = null; - try { - credentials = parseCredentials(appContext); + try { + // create and recover app. + RMAppImpl application = + createAndPopulateNewRMApp(appContext, appState.getSubmitTime(), + appState.getUser()); + application.recover(rmState); + + // If security is enabled and the application is NOT in a final state, + // parse the credentials and renew delegation token + if (UserGroupInformation.isSecurityEnabled() && + !isApplicationInFinalState(appState.getState())) { + Credentials credentials = parseCredentials(appContext); // synchronously renew delegation token on recovery. rmContext.getDelegationTokenRenewer().addApplicationSync(appId, - credentials, appContext.getCancelTokensWhenComplete()); - application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER)); - } catch (Exception e) { - LOG.warn("Unable to parse and renew delegation tokens.", e); - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(appId, e.getMessage())); - throw e; + credentials, appContext.getCancelTokensWhenComplete()); } - } else { + + // Actual recovery of the application application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER)); + } catch (Exception e) { + LOG.error("Failed to recover application + " + appId, e); + // Fail the application if it is a running application. + if (!isApplicationInFinalState(appState.getState())) { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppRejectedEvent(appId, e.getMessage())); + } + throw e; } } @@ -436,7 +435,8 @@ protected Credentials parseCredentials(ApplicationSubmissionContext application) } return credentials; } - + + @SuppressWarnings("unchecked") @Override public void recover(RMState state) throws Exception { RMStateStore store = rmContext.getStateStore(); @@ -445,7 +445,17 @@ public void recover(RMState state) throws Exception { Map appStates = state.getApplicationState(); LOG.info("Recovering " + appStates.size() + " applications"); for (ApplicationState appState : appStates.values()) { - recoverApplication(appState, state); + try { + recoverApplication(appState, state); + } catch (ConnectException ce) { + // Unable to connect to HDFS or ZK. Assuming this is a transient + // issue, we should gracefully shutdown or transition to standby. If + // the issue is permanent, there is not much YARN can do. + rmContext.getDispatcher().getEventHandler().handle( + new RMFatalEvent(RMFatalEventType.CONNECTION_FAILED, ce)); + } catch (Exception e) { + LOG.error("Error recovering application " + appState, e); + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMFatalEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMFatalEventType.java index 0629c70..66bc87a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMFatalEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMFatalEventType.java @@ -27,5 +27,8 @@ STATE_STORE_OP_FAILED, // Source <- Embedded Elector - EMBEDDED_ELECTOR_FAILED + EMBEDDED_ELECTOR_FAILED, + + // Source <- RMAppManager + CONNECTION_FAILED } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 57c8fce..edd4ea0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -735,18 +735,34 @@ public void handle(RMFatalEvent event) { LOG.fatal("Received a " + RMFatalEvent.class.getName() + " of type " + event.getType().name() + ". Cause:\n" + event.getCause()); - if (event.getType() == RMFatalEventType.STATE_STORE_FENCED) { - LOG.info("RMStateStore has been fenced"); - if (rmContext.isHAEnabled()) { - try { - // Transition to standby and reinit active services - LOG.info("Transitioning RM to Standby mode"); - rm.transitionToStandby(true); - rm.adminService.resetLeaderElection(); - return; - } catch (Exception e) { - LOG.fatal("Failed to transition RM to Standby mode."); - } + boolean shouldTransitionToStandby = false; + if (rmContext.isHAEnabled()) { + if (event.getType() == RMFatalEventType.STATE_STORE_FENCED) { + } + switch (event.getType()) { + case STATE_STORE_FENCED: + LOG.info("RMStateStore has been fenced"); + shouldTransitionToStandby = true; + break; + case CONNECTION_FAILED: + shouldTransitionToStandby = true; + break; + default: + LOG.info("Event type " + event.getType().name() + " does not " + + "warrant transitioning to Standby. Shutting down."); + } + } + + if (shouldTransitionToStandby) { + try { + // Transition to standby and reinit active services + LOG.info("Transitioning RM to Standby mode"); + rm.transitionToStandby(true); + rm.adminService.resetLeaderElection(); + return; + } catch (Exception e) { + LOG.fatal( + "Failed to transition RM to Standby mode. Shutting down"); } } @@ -1147,6 +1163,12 @@ protected RMSecretManagerService createRMSecretManagerService() { public ClientRMService getClientRMService() { return this.clientRM; } + + @Private + @VisibleForTesting + public RMAppManager getRmAppManager() { + return this.rmAppManager; + } /** * return the scheduler. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index b5a6237..ae11b07 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -833,8 +833,10 @@ private void recoverAppAttemptCredentials(Credentials appAttemptTokens, if (UserGroupInformation.isSecurityEnabled()) { byte[] clientTokenMasterKeyBytes = appAttemptTokens.getSecretKey( RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME); - clientTokenMasterKey = rmContext.getClientToAMTokenSecretManager() - .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes); + if (clientTokenMasterKeyBytes != null) { + clientTokenMasterKey = rmContext.getClientToAMTokenSecretManager() + .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes); + } } this.amrmToken = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAppManager.java new file mode 100644 index 0000000..28786c4 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAppManager.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; + + +import static org.junit.Assert.assertEquals; +import org.junit.Test; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.net.ConnectException; + +public class TestRMAppManager { + private ResourceManager rm; + private RMAppManager appManager; + + @Test + public void testRecoveryFailureWithHA() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2"); + conf.set(YarnConfiguration.RM_HA_ID, "rm1"); + conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm1"), "0.0.0.0"); + conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm2"), "0.0.0.0"); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + for (int i = 2; i > 0; i--) { + ApplicationState appState = mock(ApplicationState.class); + when(appState.getAppId()).thenReturn(ApplicationId.newInstance(1234, i)); + memStore.getState().getApplicationState() + .put(appState.getAppId(), appState); + } + + rm = new MockRM(conf, memStore); + rm.start(); + rm.transitionToActive(); + + appManager = rm.getRmAppManager(); + RMAppManager appManagerSpy = spy(appManager); + + // Check recovery behavior on Exceptions + doThrow(new InvalidBlockTokenException()).when(appManagerSpy). + recoverApplication( + any(ApplicationState.class), any(RMStateStore.RMState.class)); + appManagerSpy.recover(memStore.getState()); + assertEquals("RM is not active any more", + HAServiceProtocol.HAServiceState.ACTIVE, + rm.getRMContext().getHAServiceState()); + + // Check recovery behavior on ConnectException + doThrow(new ConnectException()).when(appManagerSpy).recoverApplication + (any(ApplicationState.class), any(RMStateStore.RMState.class)); + appManagerSpy.recover(memStore.getState()); + + for (int i = 100; i > 0 && + rm.getRMContext().getHAServiceState() != + HAServiceProtocol.HAServiceState.STANDBY; i--) { + Thread.sleep(100); + } + assertEquals("RM continues to be active", + HAServiceProtocol.HAServiceState.STANDBY, + rm.getRMContext().getHAServiceState()); + } + + @Test + public void testRecoveryFailure() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + for (int i = 2; i > 0; i--) { + ApplicationState appState = mock(ApplicationState.class); + when(appState.getAppId()).thenReturn(ApplicationId.newInstance(1234, i)); + memStore.getState().getApplicationState() + .put(appState.getAppId(), appState); + } + + rm = new MockRM(conf, memStore); + rm.start(); + rm.transitionToActive(); + + appManager = rm.getRmAppManager(); + RMAppManager appManagerSpy = spy(appManager); + + // Check recovery behavior on Exceptions + doThrow(new InvalidBlockTokenException()).when(appManagerSpy). + recoverApplication( + any(ApplicationState.class), any(RMStateStore.RMState.class)); + appManagerSpy.recover(memStore.getState()); + assertEquals("RM is not running any more", + Service.STATE.STARTED, rm.getServiceState()); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 80be22b..ad66f41 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -538,17 +538,13 @@ public void testCapacitySchedulerQueueRemovedRecovery() throws Exception { new CapacitySchedulerConfiguration(conf); setupQueueConfigurationOnlyA(csConf); rm2 = new MockRM(csConf, memStore); - boolean runtimeThrown = false; - try { - rm2.start(); - } catch (RuntimeException e) { - //we're catching it because we want to verify the message - //and we don't want to set it as an expected exception for the - //test because we only want it to happen here - assertTrue(e.getMessage().contains(B + " missing")); - runtimeThrown = true; + rm2.start(); + // Verify the app corresponding to missing queue failed + for (RMApp rmApp: rm1.getRMContext().getRMApps().values()) { + if (rmApp.getName().equals(app2)) { + assertEquals(RMAppState.FAILED, rmApp.getState()); + } } - assertTrue(runtimeThrown); } private void checkParentQueue(ParentQueue parentQueue, int numContainers,