diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 63333b8..5ae5d6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -274,12 +274,11 @@ protected void submitApplication( ApplicationId appId = submissionContext.getApplicationId(); if (UserGroupInformation.isSecurityEnabled()) { - Credentials credentials = null; try { - credentials = parseCredentials(submissionContext); this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId, - credentials, submissionContext.getCancelTokensWhenComplete(), - application.getUser()); + parseCredentials(submissionContext), + submissionContext.getCancelTokensWhenComplete(), + application.getUser()); } catch (Exception e) { LOG.warn("Unable to parse credentials.", e); // Sending APP_REJECTED is fine, since we assume that the @@ -299,10 +298,8 @@ protected void submitApplication( } } - @SuppressWarnings("unchecked") - protected void - recoverApplication(ApplicationState appState, RMState rmState) - throws Exception { + protected void recoverApplication(ApplicationState appState, RMState rmState) + throws Exception { ApplicationSubmissionContext appContext = appState.getApplicationSubmissionContext(); ApplicationId appId = appState.getAppId(); @@ -312,32 +309,7 @@ protected void submitApplication( createAndPopulateNewRMApp(appContext, appState.getSubmitTime(), appState.getUser()); application.recover(rmState); - if (isApplicationInFinalState(appState.getState())) { - // We are synchronously moving the application into final state so that - // momentarily client will not see this application in NEW state. Also - // for finished applications we will avoid renewing tokens. - application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER)); - return; - } - - if (UserGroupInformation.isSecurityEnabled()) { - Credentials credentials = null; - try { - credentials = parseCredentials(appContext); - // synchronously renew delegation token on recovery. - rmContext.getDelegationTokenRenewer().addApplicationSync(appId, - credentials, appContext.getCancelTokensWhenComplete(), - application.getUser()); - application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER)); - } catch (Exception e) { - LOG.warn("Unable to parse and renew delegation tokens.", e); - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(appId, e.getMessage())); - throw e; - } - } else { - application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER)); - } + application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER)); } private RMAppImpl createAndPopulateNewRMApp( @@ -416,18 +388,9 @@ private ResourceRequest validateAndCreateResourceRequest( return null; } - - private boolean isApplicationInFinalState(RMAppState rmAppState) { - if (rmAppState == RMAppState.FINISHED || rmAppState == RMAppState.FAILED - || rmAppState == RMAppState.KILLED) { - return true; - } else { - return false; - } - } - protected Credentials parseCredentials(ApplicationSubmissionContext application) - throws IOException { + protected Credentials parseCredentials( + ApplicationSubmissionContext application) throws IOException { Credentials credentials = new Credentials(); DataInputByteBuffer dibb = new DataInputByteBuffer(); ByteBuffer tokens = application.getAMContainerSpec().getTokens(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 1994b36..04ead08 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -18,8 +18,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -36,6 +38,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -832,6 +836,19 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { return app.recoveredFinalState; } + if (UserGroupInformation.isSecurityEnabled()) { + // synchronously renew delegation token on recovery. + try { + app.rmContext.getDelegationTokenRenewer().addApplicationSync( + app.getApplicationId(), app.parseCredentials(), + app.submissionContext.getCancelTokensWhenComplete(), app.getUser()); + } catch (Exception e) { + LOG.info("Failed to renew delegation token on recovery for " + + app.applicationId, e); + return RMAppState.FAILED; + } + } + // No existent attempts means the attempt associated with this app was not // started or started but not yet saved. if (app.attempts.isEmpty()) { @@ -1296,4 +1313,16 @@ public void setSystemClock(Clock clock) { public ReservationId getReservationId() { return submissionContext.getReservationID(); } + + protected Credentials parseCredentials() throws IOException { + Credentials credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + ByteBuffer tokens = submissionContext.getAMContainerSpec().getTokens(); + if (tokens != null) { + dibb.reset(tokens); + credentials.readTokenStorageStream(dibb); + tokens.rewind(); + } + return credentials; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index b5a6237..ae11b07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -833,8 +833,10 @@ private void recoverAppAttemptCredentials(Credentials appAttemptTokens, if (UserGroupInformation.isSecurityEnabled()) { byte[] clientTokenMasterKeyBytes = appAttemptTokens.getSecretKey( RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME); - clientTokenMasterKey = rmContext.getClientToAMTokenSecretManager() - .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes); + if (clientTokenMasterKeyBytes != null) { + clientTokenMasterKey = rmContext.getClientToAMTokenSecretManager() + .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes); + } } this.amrmToken = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueNotFoundException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueNotFoundException.java new file mode 100644 index 0000000..35a1d66 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueNotFoundException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +@Private +public class QueueNotFoundException extends YarnRuntimeException { + + private static final long serialVersionUID = 187239430L; + + public QueueNotFoundException(String message) { + super(message); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 9332228..c383e43 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -80,6 +80,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping; @@ -676,15 +677,13 @@ private synchronized void addApplication(ApplicationId applicationId, //During a restart, this indicates a queue was removed, which is //not presently supported if (isAppRecovering) { - //throwing RuntimeException because some other exceptions are caught - //(including YarnRuntimeException) and we want this to force an exit - String queueErrorMsg = "Queue named " + queueName + String queueErrorMsg = "Queue named " + queueName + " missing during application recovery." + " Queue removal during recovery is not presently supported by the" + " capacity scheduler, please restart with all queues configured" + " which were present before shutdown/restart."; LOG.fatal(queueErrorMsg); - throw new RuntimeException(queueErrorMsg); + throw new QueueNotFoundException(queueErrorMsg); } String message = "Application " + applicationId + " submitted by user " + user + " to unknown queue: " + queueName; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 85d3895..536dbd7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -37,6 +37,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.Service; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -570,10 +572,10 @@ public void testCapacitySchedulerRecovery() throws Exception { // submission //2. Remove one of the queues, restart the RM //3. Verify that the expected exception was thrown - @Test (timeout = 30000) + @Test (timeout = 30000, expected = QueueNotFoundException.class) public void testCapacitySchedulerQueueRemovedRecovery() throws Exception { if (!schedulerClass.equals(CapacityScheduler.class)) { - return; + throw new QueueNotFoundException("Dummy"); } conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, @@ -614,17 +616,7 @@ public void testCapacitySchedulerQueueRemovedRecovery() throws Exception { new CapacitySchedulerConfiguration(conf); setupQueueConfigurationOnlyA(csConf); rm2 = new MockRM(csConf, memStore); - boolean runtimeThrown = false; - try { - rm2.start(); - } catch (RuntimeException e) { - //we're catching it because we want to verify the message - //and we don't want to set it as an expected exception for the - //test because we only want it to happen here - assertTrue(e.getMessage().contains(B + " missing")); - runtimeThrown = true; - } - assertTrue(runtimeThrown); + rm2.start(); } private void checkParentQueue(ParentQueue parentQueue, int numContainers, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 6a66385..9e8ada1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.verify; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; import java.util.Map; @@ -35,6 +36,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; @@ -43,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; @@ -73,9 +77,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -199,10 +205,11 @@ public void setUp() throws Exception { AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class); store = mock(RMStateStore.class); writer = mock(RMApplicationHistoryWriter.class); + DelegationTokenRenewer renewer = mock(DelegationTokenRenewer.class); RMContext realRMContext = new RMContextImpl(rmDispatcher, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, - null, new AMRMTokenSecretManager(conf, this.rmContext), + renewer, new AMRMTokenSecretManager(conf, this.rmContext), new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), new ClientToAMTokenSecretManagerInRM(), @@ -514,7 +521,18 @@ public void testAppSuccessPath() throws IOException { @Test (timeout = 30000) public void testAppRecoverPath() throws IOException { LOG.info("--- START: testAppRecoverPath ---"); - testCreateAppSubmittedRecovery(null); + ApplicationSubmissionContext sub = + Records.newRecord(ApplicationSubmissionContext.class); + ContainerLaunchContext clc = + Records.newRecord(ContainerLaunchContext.class); + Credentials credentials = new Credentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + ByteBuffer securityTokens = + ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + clc.setTokens(securityTokens); + sub.setAMContainerSpec(clc); + testCreateAppSubmittedRecovery(sub); } @Test (timeout = 30000)