diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index f1ebbfe..0b8b1c2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.api.records; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; @@ -26,6 +28,8 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; @@ -90,7 +94,7 @@ public static ApplicationSubmissionContext newInstance( context.setKeepContainersAcrossApplicationAttempts(keepContainers); context.setNodeLabelExpression(appLabelExpression); context.setResource(resource); - + ResourceRequest amReq = Records.newRecord(ResourceRequest.class); amReq.setResourceName(ResourceRequest.ANY); amReq.setCapability(resource); @@ -100,7 +104,7 @@ public static ApplicationSubmissionContext newInstance( context.setAMContainerResourceRequest(amReq); return context; } - + public static ApplicationSubmissionContext newInstance( ApplicationId applicationId, String applicationName, String queue, Priority priority, ContainerLaunchContext amContainer, @@ -120,8 +124,8 @@ public static ApplicationSubmissionContext newInstance( boolean isUnmanagedAM, boolean cancelTokensWhenComplete, int maxAppAttempts, Resource resource, String applicationType) { return newInstance(applicationId, applicationName, queue, priority, - amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts, - resource, applicationType, false, null, null); + amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts, + resource, applicationType, false, null, null); } @Public @@ -132,10 +136,10 @@ public static ApplicationSubmissionContext newInstance( boolean isUnmanagedAM, boolean cancelTokensWhenComplete, int maxAppAttempts, Resource resource) { return newInstance(applicationId, applicationName, queue, priority, - amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts, - resource, null); + amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts, + resource, null); } - + @Public @Stable public static ApplicationSubmissionContext newInstance( @@ -169,8 +173,9 @@ public static ApplicationSubmissionContext newInstance( boolean keepContainers, long attemptFailuresValidityInterval) { ApplicationSubmissionContext context = newInstance(applicationId, applicationName, queue, priority, - amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts, - resource, applicationType, keepContainers); + amContainer, isUnmanagedAM, cancelTokensWhenComplete, + maxAppAttempts, + resource, applicationType, keepContainers); context.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval); return context; } @@ -185,11 +190,26 @@ public static ApplicationSubmissionContext newInstance( boolean keepContainers, LogAggregationContext logAggregationContext) { ApplicationSubmissionContext context = newInstance(applicationId, applicationName, queue, priority, - amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts, - resource, applicationType, keepContainers); + amContainer, isUnmanagedAM, cancelTokensWhenComplete, + maxAppAttempts, + resource, applicationType, keepContainers); context.setLogAggregationContext(logAggregationContext); return context; } + + @Private + public Credentials parseCredentials() throws IOException { + Credentials credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + ByteBuffer tokens = getAMContainerSpec().getTokens(); + if(tokens!=null) { + dibb.reset(tokens); + credentials.readTokenStorageStream(dibb); + tokens.rewind(); + } + return credentials; + } + /** * Get the ApplicationId of the submitted application. * @return ApplicationId of the submitted application diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/QueueNotFoundException.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/QueueNotFoundException.java new file mode 100644 index 0000000..14873dd --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/QueueNotFoundException.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.exceptions; + +public class QueueNotFoundException extends YarnRuntimeException { + + public QueueNotFoundException(String message) { + super(message); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 63333b8..5cf1320 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; +import org.apache.hadoop.yarn.exceptions.QueueNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; @@ -274,12 +275,11 @@ protected void submitApplication( ApplicationId appId = submissionContext.getApplicationId(); if (UserGroupInformation.isSecurityEnabled()) { - Credentials credentials = null; try { - credentials = parseCredentials(submissionContext); this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId, - credentials, submissionContext.getCancelTokensWhenComplete(), - application.getUser()); + submissionContext.parseCredentials(), + submissionContext.getCancelTokensWhenComplete(), + application.getUser()); } catch (Exception e) { LOG.warn("Unable to parse credentials.", e); // Sending APP_REJECTED is fine, since we assume that the @@ -312,31 +312,16 @@ protected void submitApplication( createAndPopulateNewRMApp(appContext, appState.getSubmitTime(), appState.getUser()); application.recover(rmState); - if (isApplicationInFinalState(appState.getState())) { - // We are synchronously moving the application into final state so that - // momentarily client will not see this application in NEW state. Also - // for finished applications we will avoid renewing tokens. - application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER)); - return; - } - if (UserGroupInformation.isSecurityEnabled()) { - Credentials credentials = null; - try { - credentials = parseCredentials(appContext); - // synchronously renew delegation token on recovery. - rmContext.getDelegationTokenRenewer().addApplicationSync(appId, - credentials, appContext.getCancelTokensWhenComplete(), - application.getUser()); - application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER)); - } catch (Exception e) { - LOG.warn("Unable to parse and renew delegation tokens.", e); + try { + application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER)); + } catch (Exception e) { + LOG.warn("Failed to recover application.", e); + if (!isApplicationInFinalState(appState.getState())) { this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(appId, e.getMessage())); - throw e; + .handle(new RMAppRejectedEvent(appId, e.getMessage())); } - } else { - application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER)); + throw e; } } @@ -426,19 +411,7 @@ private boolean isApplicationInFinalState(RMAppState rmAppState) { } } - protected Credentials parseCredentials(ApplicationSubmissionContext application) - throws IOException { - Credentials credentials = new Credentials(); - DataInputByteBuffer dibb = new DataInputByteBuffer(); - ByteBuffer tokens = application.getAMContainerSpec().getTokens(); - if (tokens != null) { - dibb.reset(tokens); - credentials.readTokenStorageStream(dibb); - tokens.rewind(); - } - return credentials; - } - + @SuppressWarnings("unchecked") @Override public void recover(RMState state) throws Exception { RMStateStore store = rmContext.getStateStore(); @@ -447,7 +420,14 @@ public void recover(RMState state) throws Exception { Map appStates = state.getApplicationState(); LOG.info("Recovering " + appStates.size() + " applications"); for (ApplicationState appState : appStates.values()) { - recoverApplication(appState, state); + try { + recoverApplication(appState, state); + } catch (QueueNotFoundException qnfe) { + // Trying to recover app corresponding to a missing queue + throw qnfe; + } catch (Exception e) { + LOG.error("Error recovering application " + appState, e); + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 1994b36..2afe678 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Collection; @@ -228,6 +229,11 @@ .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) + .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING, + RMAppEventType.APP_REJECTED, + new FinalSavingTransition(new AppRejectedTransition(), + RMAppState.FAILED)) + // Transitions from RUNNING state .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, @@ -825,6 +831,19 @@ private void recoverAppAttempts() { @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { + if (UserGroupInformation.isSecurityEnabled()) { + // synchronously renew delegation token on recovery. + try { + app.rmContext.getDelegationTokenRenewer().addApplicationSync( + app.getApplicationId(), + app.submissionContext.parseCredentials(), + app.submissionContext.getCancelTokensWhenComplete(), + app.getUser()); + } catch (Exception e) { + return RMAppState.FAILED; + } + } + // The app has completed. if (app.recoveredFinalState != null) { app.recoverAppAttempts(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index b5a6237..ae11b07 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -833,8 +833,10 @@ private void recoverAppAttemptCredentials(Credentials appAttemptTokens, if (UserGroupInformation.isSecurityEnabled()) { byte[] clientTokenMasterKeyBytes = appAttemptTokens.getSecretKey( RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME); - clientTokenMasterKey = rmContext.getClientToAMTokenSecretManager() - .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes); + if (clientTokenMasterKeyBytes != null) { + clientTokenMasterKey = rmContext.getClientToAMTokenSecretManager() + .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes); + } } this.amrmToken = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 9332228..5f1f645 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.QueueNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; @@ -676,15 +677,13 @@ private synchronized void addApplication(ApplicationId applicationId, //During a restart, this indicates a queue was removed, which is //not presently supported if (isAppRecovering) { - //throwing RuntimeException because some other exceptions are caught - //(including YarnRuntimeException) and we want this to force an exit - String queueErrorMsg = "Queue named " + queueName + String queueErrorMsg = "Queue named " + queueName + " missing during application recovery." + " Queue removal during recovery is not presently supported by the" + " capacity scheduler, please restart with all queues configured" + " which were present before shutdown/restart."; LOG.fatal(queueErrorMsg); - throw new RuntimeException(queueErrorMsg); + throw new QueueNotFoundException(queueErrorMsg); } String message = "Application " + applicationId + " submitted by user " + user + " to unknown queue: " + queueName; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 9d0ac27..9f127fb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -354,14 +354,6 @@ public RMApp submitApp(int masterMemory, String name, String user, LogAggregationContext logAggregationContext) throws Exception { ApplicationId appId = isAppIdProvided ? applicationId : null; - ApplicationClientProtocol client = getClientRMService(); - if (! isAppIdProvided) { - GetNewApplicationResponse resp = client.getNewApplication(Records - .newRecord(GetNewApplicationRequest.class)); - appId = resp.getApplicationId(); - } - SubmitApplicationRequest req = Records - .newRecord(SubmitApplicationRequest.class); ApplicationSubmissionContext sub = Records .newRecord(ApplicationSubmissionContext.class); sub.setKeepContainersAcrossApplicationAttempts(keepContainers); @@ -392,31 +384,47 @@ public RMApp submitApp(int masterMemory, String name, String user, if (logAggregationContext != null) { sub.setLogAggregationContext(logAggregationContext); } + + return submitApp(user, sub, waitForAccepted); + } + + public RMApp submitApp(String user, ApplicationSubmissionContext sub, + boolean waitForAccepted) throws Exception { + ApplicationClientProtocol client = getClientRMService(); + if (sub.getApplicationId() == null) { + GetNewApplicationResponse resp = client.getNewApplication(Records + .newRecord(GetNewApplicationRequest.class)); + sub.setApplicationId(resp.getApplicationId()); + } + ApplicationId appId = sub.getApplicationId(); + + SubmitApplicationRequest req = Records + .newRecord(SubmitApplicationRequest.class); req.setApplicationSubmissionContext(sub); UserGroupInformation fakeUser = - UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"}); + UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"}); PrivilegedAction action = - new PrivilegedAction() { - ApplicationClientProtocol client; - SubmitApplicationRequest req; - @Override - public SubmitApplicationResponse run() { - try { - return client.submitApplication(req); - } catch (YarnException e) { - e.printStackTrace(); - } catch (IOException e) { - e.printStackTrace(); - } - return null; - } - PrivilegedAction setClientReq( - ApplicationClientProtocol client, SubmitApplicationRequest req) { - this.client = client; - this.req = req; - return this; - } - }.setClientReq(client, req); + new PrivilegedAction() { + ApplicationClientProtocol client; + SubmitApplicationRequest req; + @Override + public SubmitApplicationResponse run() { + try { + return client.submitApplication(req); + } catch (YarnException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + PrivilegedAction setClientReq( + ApplicationClientProtocol client, SubmitApplicationRequest req) { + this.client = client; + this.req = req; + return this; + } + }.setClientReq(client, req); fakeUser.doAs(action); // make sure app is immediately available after submit if (waitForAccepted) { @@ -425,7 +433,7 @@ public SubmitApplicationResponse run() { RMApp rmApp = getRMContext().getRMApps().get(appId); // unmanaged AM won't go to RMAppAttemptState.SCHEDULED. - if (waitForAccepted && !unmanaged) { + if (waitForAccepted && !sub.getUnmanagedAM()) { waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(), RMAppAttemptState.SCHEDULED); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAppManager.java new file mode 100644 index 0000000..7c4ab8d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAppManager.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.QueueNotFoundException; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; + + +import static org.junit.Assert.assertEquals; +import org.junit.Test; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +public class TestRMAppManager { + private MockRM rm; + private RMAppManager appManager; + + @Test (expected = QueueNotFoundException.class) + public void testRecoveryFailureWithHA() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2"); + conf.set(YarnConfiguration.RM_HA_ID, "rm1"); + conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm1"), "0.0.0.0"); + conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm2"), "0.0.0.0"); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + for (int i = 2; i > 0; i--) { + ApplicationState appState = mock(ApplicationState.class); + when(appState.getAppId()).thenReturn(ApplicationId.newInstance(1234, i)); + memStore.getState().getApplicationState() + .put(appState.getAppId(), appState); + } + + rm = new MockRM(conf, memStore); + rm.start(); + rm.transitionToActive(); + + appManager = rm.getRMAppManager(); + RMAppManager appManagerSpy = spy(appManager); + + // Check recovery behavior on Exceptions + doThrow(new InvalidBlockTokenException()).when(appManagerSpy). + recoverApplication( + any(ApplicationState.class), any(RMStateStore.RMState.class)); + appManagerSpy.recover(memStore.getState()); + assertEquals("RM is not active any more", + HAServiceProtocol.HAServiceState.ACTIVE, + rm.getRMContext().getHAServiceState()); + + // Check recovery behavior on ConnectException + doThrow(new QueueNotFoundException("Missing queue")) + .when(appManagerSpy).recoverApplication( + any(ApplicationState.class), any(RMStateStore.RMState.class)); + appManagerSpy.recover(memStore.getState()); + } + + @Test (expected = QueueNotFoundException.class) + public void testRecoveryFailure() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + for (int i = 2; i > 0; i--) { + ApplicationState appState = mock(ApplicationState.class); + when(appState.getAppId()).thenReturn(ApplicationId.newInstance(1234, i)); + memStore.getState().getApplicationState() + .put(appState.getAppId(), appState); + } + + rm = new MockRM(conf, memStore); + rm.start(); + rm.transitionToActive(); + + appManager = rm.getRMAppManager(); + RMAppManager appManagerSpy = spy(appManager); + + // Check recovery behavior on Exceptions + doThrow(new InvalidBlockTokenException()).when(appManagerSpy). + recoverApplication( + any(ApplicationState.class), any(RMStateStore.RMState.class)); + appManagerSpy.recover(memStore.getState()); + assertEquals("RM is not running any more", + Service.STATE.STARTED, rm.getServiceState()); + + // Check recovery behavior on ConnectException + doThrow(new QueueNotFoundException("Missing queue")) + .when(appManagerSpy).recoverApplication( + any(ApplicationState.class), any(RMStateStore.RMState.class)); + appManagerSpy.recover(memStore.getState()); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 9502eba..2701fba 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; @@ -99,6 +101,7 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -1611,33 +1614,21 @@ public void testAppFailedOnSubmissionSavedInStateStore() throws Exception { MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); - MockRM rm1 = new TestSecurityMockRM(conf, memStore) { - @Override - protected RMAppManager createRMAppManager() { - return new TestRMAppManager(this.rmContext, this.scheduler, - this.masterService, this.applicationACLsManager, conf); - } + MockRM rm1 = new TestSecurityMockRM(conf, memStore); + rm1.start(); - class TestRMAppManager extends RMAppManager { + ApplicationSubmissionContext asc = ApplicationSubmissionContext + .newInstance( + ApplicationId.newInstance(0, 1), "app", "queue", + Priority.newInstance(1), mock(ContainerLaunchContext.class), + false, false, 2, Resources.createResource(256)); - public TestRMAppManager(RMContext context, YarnScheduler scheduler, - ApplicationMasterService masterService, - ApplicationACLsManager applicationACLsManager, Configuration conf) { - super(context, scheduler, masterService, applicationACLsManager, conf); - } + ApplicationSubmissionContext context = spy(asc); + doThrow(new IOException("Parsing credential error.")). + when(context).parseCredentials(); + + RMApp app1 = rm1.submitApp("user", context, false); - @Override - protected Credentials parseCredentials( - ApplicationSubmissionContext application) throws IOException { - throw new IOException("Parsing credential error."); - } - } - }; - rm1.start(); - RMApp app1 = - rm1.submitApp(200, "name", "user", - new HashMap(), false, "default", -1, - null, "MAPREDUCE", false); rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED); // Check app staet is saved in state store. Assert.assertEquals(RMAppState.FAILED, memStore.getState() diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 85d3895..8157f52 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -37,6 +37,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.Service; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.QueueNotFoundException; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; @@ -570,10 +572,10 @@ public void testCapacitySchedulerRecovery() throws Exception { // submission //2. Remove one of the queues, restart the RM //3. Verify that the expected exception was thrown - @Test (timeout = 30000) + @Test (timeout = 30000, expected = QueueNotFoundException.class) public void testCapacitySchedulerQueueRemovedRecovery() throws Exception { if (!schedulerClass.equals(CapacityScheduler.class)) { - return; + throw new QueueNotFoundException("Dummy"); } conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, @@ -614,17 +616,7 @@ public void testCapacitySchedulerQueueRemovedRecovery() throws Exception { new CapacitySchedulerConfiguration(conf); setupQueueConfigurationOnlyA(csConf); rm2 = new MockRM(csConf, memStore); - boolean runtimeThrown = false; - try { - rm2.start(); - } catch (RuntimeException e) { - //we're catching it because we want to verify the message - //and we don't want to set it as an expected exception for the - //test because we only want it to happen here - assertTrue(e.getMessage().contains(B + " missing")); - runtimeThrown = true; - } - assertTrue(runtimeThrown); + rm2.start(); } private void checkParentQueue(ParentQueue parentQueue, int numContainers, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 6a66385..265815a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -35,6 +35,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; @@ -73,6 +74,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; + +import org.apache.hadoop.yarn.server.resourcemanager.security + .DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -199,10 +203,11 @@ public void setUp() throws Exception { AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class); store = mock(RMStateStore.class); writer = mock(RMApplicationHistoryWriter.class); - RMContext realRMContext = + DelegationTokenRenewer renewer = mock(DelegationTokenRenewer.class); + RMContext realRMContext = new RMContextImpl(rmDispatcher, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, - null, new AMRMTokenSecretManager(conf, this.rmContext), + renewer, new AMRMTokenSecretManager(conf, this.rmContext), new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), new ClientToAMTokenSecretManagerInRM(), @@ -248,7 +253,12 @@ protected RMApp createNewTestApp(ApplicationSubmissionContext submissionContext) new ApplicationMasterService(rmContext, scheduler); if(submissionContext == null) { - submissionContext = new ApplicationSubmissionContextPBImpl(); + submissionContext = new ApplicationSubmissionContextPBImpl() { + @Override + public Credentials parseCredentials() { + return null; + } + }; } // applicationId will not be used because RMStateStore is mocked, // but applicationId is still set for safety