diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/QueueNotFoundException.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/QueueNotFoundException.java new file mode 100644 index 0000000..14873dd --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/QueueNotFoundException.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.exceptions; + +public class QueueNotFoundException extends YarnRuntimeException { + + public QueueNotFoundException(String message) { + super(message); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 63333b8..da662de 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; +import org.apache.hadoop.yarn.exceptions.QueueNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; @@ -312,31 +313,23 @@ protected void submitApplication( createAndPopulateNewRMApp(appContext, appState.getSubmitTime(), appState.getUser()); application.recover(rmState); - if (isApplicationInFinalState(appState.getState())) { - // We are synchronously moving the application into final state so that - // momentarily client will not see this application in NEW state. Also - // for finished applications we will avoid renewing tokens. - application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER)); - return; - } - if (UserGroupInformation.isSecurityEnabled()) { - Credentials credentials = null; - try { - credentials = parseCredentials(appContext); + try { + if (UserGroupInformation.isSecurityEnabled()) { + Credentials credentials = parseCredentials(appContext); // synchronously renew delegation token on recovery. rmContext.getDelegationTokenRenewer().addApplicationSync(appId, - credentials, appContext.getCancelTokensWhenComplete(), - application.getUser()); - application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER)); - } catch (Exception e) { - LOG.warn("Unable to parse and renew delegation tokens.", e); - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(appId, e.getMessage())); - throw e; + credentials, appContext.getCancelTokensWhenComplete(), + application.getUser()); } - } else { application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER)); + } catch (Exception e) { + LOG.warn("Failed to recover application.", e); + if (!isApplicationInFinalState(appState.getState())) { + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppRejectedEvent(appId, e.getMessage())); + } + throw e; } } @@ -438,7 +431,8 @@ protected Credentials parseCredentials(ApplicationSubmissionContext application) } return credentials; } - + + @SuppressWarnings("unchecked") @Override public void recover(RMState state) throws Exception { RMStateStore store = rmContext.getStateStore(); @@ -447,7 +441,15 @@ public void recover(RMState state) throws Exception { Map appStates = state.getApplicationState(); LOG.info("Recovering " + appStates.size() + " applications"); for (ApplicationState appState : appStates.values()) { - recoverApplication(appState, state); + try { + recoverApplication(appState, state); + } catch (QueueNotFoundException qnfe) { + // Trying to recover app corresponding to a missing queue + rmContext.getDispatcher().getEventHandler().handle( + new RMFatalEvent(RMFatalEventType.QUEUE_NOT_FOUND, qnfe)); + } catch (Exception e) { + LOG.error("Error recovering application " + appState, e); + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMFatalEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMFatalEventType.java index 0629c70..17e8662 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMFatalEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMFatalEventType.java @@ -27,5 +27,8 @@ STATE_STORE_OP_FAILED, // Source <- Embedded Elector - EMBEDDED_ELECTOR_FAILED + EMBEDDED_ELECTOR_FAILED, + + // Source <- RMAppManager + QUEUE_NOT_FOUND } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 57c8fce..796b6f1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -735,22 +735,36 @@ public void handle(RMFatalEvent event) { LOG.fatal("Received a " + RMFatalEvent.class.getName() + " of type " + event.getType().name() + ". Cause:\n" + event.getCause()); - if (event.getType() == RMFatalEventType.STATE_STORE_FENCED) { - LOG.info("RMStateStore has been fenced"); - if (rmContext.isHAEnabled()) { - try { - // Transition to standby and reinit active services - LOG.info("Transitioning RM to Standby mode"); - rm.transitionToStandby(true); - rm.adminService.resetLeaderElection(); - return; - } catch (Exception e) { - LOG.fatal("Failed to transition RM to Standby mode."); - } + boolean shouldTransitionToStandby = false; + if (rmContext.isHAEnabled()) { + switch (event.getType()) { + case STATE_STORE_FENCED: + LOG.info("RMStateStore has been fenced"); + shouldTransitionToStandby = true; + break; + case QUEUE_NOT_FOUND: + shouldTransitionToStandby = true; + break; + default: + LOG.info("Event type " + event.getType().name() + " does not " + + "warrant transitioning to Standby. Shutting down."); + } + } + + if (shouldTransitionToStandby) { + try { + // Transition to standby and reinit active services + LOG.info("Transitioning RM to Standby mode"); + rm.transitionToStandby(true); + rm.adminService.resetLeaderElection(); + return; + } catch (Exception e) { + LOG.fatal( + "Failed to transition RM to Standby mode. Shutting down"); } } - ExitUtil.terminate(1, event.getCause()); + rm.shutdown(1, event.getCause()); } } @@ -1117,6 +1131,16 @@ protected void serviceStop() throws Exception { transitionToStandby(false); rmContext.setHAServiceState(HAServiceState.STOPPING); } + + @Private + @VisibleForTesting + protected void shutdown(int status, String message) { + try { + this.stop(); + } finally { + ExitUtil.terminate(status, message); + } + } protected ResourceTrackerService createResourceTrackerService() { return new ResourceTrackerService(this.rmContext, this.nodesListManager, @@ -1147,7 +1171,7 @@ protected RMSecretManagerService createRMSecretManagerService() { public ClientRMService getClientRMService() { return this.clientRM; } - + /** * return the scheduler. * @return the scheduler for the Resource Manager. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index b5a6237..ae11b07 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -833,8 +833,10 @@ private void recoverAppAttemptCredentials(Credentials appAttemptTokens, if (UserGroupInformation.isSecurityEnabled()) { byte[] clientTokenMasterKeyBytes = appAttemptTokens.getSecretKey( RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME); - clientTokenMasterKey = rmContext.getClientToAMTokenSecretManager() - .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes); + if (clientTokenMasterKeyBytes != null) { + clientTokenMasterKey = rmContext.getClientToAMTokenSecretManager() + .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes); + } } this.amrmToken = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 9332228..5f1f645 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.QueueNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; @@ -676,15 +677,13 @@ private synchronized void addApplication(ApplicationId applicationId, //During a restart, this indicates a queue was removed, which is //not presently supported if (isAppRecovering) { - //throwing RuntimeException because some other exceptions are caught - //(including YarnRuntimeException) and we want this to force an exit - String queueErrorMsg = "Queue named " + queueName + String queueErrorMsg = "Queue named " + queueName + " missing during application recovery." + " Queue removal during recovery is not presently supported by the" + " capacity scheduler, please restart with all queues configured" + " which were present before shutdown/restart."; LOG.fatal(queueErrorMsg); - throw new RuntimeException(queueErrorMsg); + throw new QueueNotFoundException(queueErrorMsg); } String message = "Application " + applicationId + " submitted by user " + user + " to unknown queue: " + queueName; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 9d0ac27..416aef4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -640,6 +640,11 @@ protected void startWepApp() { // Disable webapp } + @Override + protected void shutdown(int status, String message) { + super.stop(); + } + public static void finishAMAndVerifyAppState(RMApp rmApp, MockRM rm, MockNM nm, MockAM am) throws Exception { FinishApplicationMasterRequest req = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAppManager.java new file mode 100644 index 0000000..6c5dc73 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAppManager.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.QueueNotFoundException; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; + + +import static org.junit.Assert.assertEquals; +import org.junit.Test; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.net.ConnectException; + +public class TestRMAppManager { + private MockRM rm; + private RMAppManager appManager; + + @Test + public void testRecoveryFailureWithHA() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2"); + conf.set(YarnConfiguration.RM_HA_ID, "rm1"); + conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm1"), "0.0.0.0"); + conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm2"), "0.0.0.0"); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + for (int i = 2; i > 0; i--) { + ApplicationState appState = mock(ApplicationState.class); + when(appState.getAppId()).thenReturn(ApplicationId.newInstance(1234, i)); + memStore.getState().getApplicationState() + .put(appState.getAppId(), appState); + } + + rm = new MockRM(conf, memStore); + rm.start(); + rm.transitionToActive(); + + appManager = rm.getRMAppManager(); + RMAppManager appManagerSpy = spy(appManager); + + // Check recovery behavior on Exceptions + doThrow(new InvalidBlockTokenException()).when(appManagerSpy). + recoverApplication( + any(ApplicationState.class), any(RMStateStore.RMState.class)); + appManagerSpy.recover(memStore.getState()); + assertEquals("RM is not active any more", + HAServiceProtocol.HAServiceState.ACTIVE, + rm.getRMContext().getHAServiceState()); + + // Check recovery behavior on ConnectException + doThrow(new QueueNotFoundException("Missing queue")) + .when(appManagerSpy).recoverApplication( + any(ApplicationState.class), any(RMStateStore.RMState.class)); + appManagerSpy.recover(memStore.getState()); + + for (int i = 100; i > 0 && + rm.getRMContext().getHAServiceState() != + HAServiceProtocol.HAServiceState.STANDBY; i--) { + Thread.sleep(100); + } + assertEquals("RM continues to be active", + HAServiceProtocol.HAServiceState.STANDBY, + rm.getRMContext().getHAServiceState()); + } + + @Test + public void testRecoveryFailure() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + for (int i = 2; i > 0; i--) { + ApplicationState appState = mock(ApplicationState.class); + when(appState.getAppId()).thenReturn(ApplicationId.newInstance(1234, i)); + memStore.getState().getApplicationState() + .put(appState.getAppId(), appState); + } + + rm = new MockRM(conf, memStore); + rm.start(); + rm.transitionToActive(); + + appManager = rm.getRMAppManager(); + RMAppManager appManagerSpy = spy(appManager); + + // Check recovery behavior on Exceptions + doThrow(new InvalidBlockTokenException()).when(appManagerSpy). + recoverApplication( + any(ApplicationState.class), any(RMStateStore.RMState.class)); + appManagerSpy.recover(memStore.getState()); + assertEquals("RM is not running any more", + Service.STATE.STARTED, rm.getServiceState()); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 85d3895..e20a7c1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -37,6 +37,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.Service; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -614,17 +615,14 @@ public void testCapacitySchedulerQueueRemovedRecovery() throws Exception { new CapacitySchedulerConfiguration(conf); setupQueueConfigurationOnlyA(csConf); rm2 = new MockRM(csConf, memStore); - boolean runtimeThrown = false; - try { - rm2.start(); - } catch (RuntimeException e) { - //we're catching it because we want to verify the message - //and we don't want to set it as an expected exception for the - //test because we only want it to happen here - assertTrue(e.getMessage().contains(B + " missing")); - runtimeThrown = true; + rm2.start(); + + for (int retries = 1000; + retries > 0 && !rm2.isInState(Service.STATE.STOPPED); + retries--) { + Thread.sleep(100); } - assertTrue(runtimeThrown); + assertTrue("RM still running", rm2.isInState(Service.STATE.STOPPED)); } private void checkParentQueue(ParentQueue parentQueue, int numContainers,