diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 8dcfe67..a7cebe8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -281,23 +281,24 @@ protected void submitApplication( RMAppImpl application = createAndPopulateNewRMApp(submissionContext, submitTime, user); ApplicationId appId = submissionContext.getApplicationId(); + Credentials credentials = null; + try { + credentials = parseCredentials(submissionContext); + } catch (Exception e) { + LOG.warn("Unable to parse credentials.", e); + // Sending APP_REJECTED is fine, since we assume that the + // RMApp is in NEW state and thus we haven't yet informed the + // scheduler about the existence of the application + assert application.getState() == RMAppState.NEW; + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppRejectedEvent(applicationId, e.getMessage())); + throw RPCUtil.getRemoteException(e); + } if (UserGroupInformation.isSecurityEnabled()) { - try { - this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId, - parseCredentials(submissionContext), - submissionContext.getCancelTokensWhenComplete(), - application.getUser()); - } catch (Exception e) { - LOG.warn("Unable to parse credentials.", e); - // Sending APP_REJECTED is fine, since we assume that the - // RMApp is in NEW state and thus we haven't yet informed the - // scheduler about the existence of the application - assert application.getState() == RMAppState.NEW; - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(applicationId, e.getMessage())); - throw RPCUtil.getRemoteException(e); - } + this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId, + credentials, submissionContext.getCancelTokensWhenComplete(), + application.getUser()); } else { // Dispatcher is not yet started at this time, so these START events // enqueued should be guaranteed to be first processed when dispatcher diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index 0dd9ba1..884a6e1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -220,10 +220,12 @@ private void setupTokens( Credentials credentials = new Credentials(); DataInputByteBuffer dibb = new DataInputByteBuffer(); - if (container.getTokens() != null) { + ByteBuffer tokens = container.getTokens(); + if (tokens != null) { // TODO: Don't do this kind of checks everywhere. - dibb.reset(container.getTokens()); + dibb.reset(tokens); credentials.readTokenStorageStream(dibb); + tokens.rewind(); } // Add AMRMToken diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index d2ac4ef..1fc4f0e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -21,6 +21,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; @@ -33,6 +35,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.concurrent.ConcurrentMap; @@ -478,6 +481,61 @@ public void testRMAppSubmit() throws Exception { getAppEventType()); } + @Test + public void testRMAppSubmitWithInvalidTokens() throws Exception { + // Setup invalid security tokens + DataOutputBuffer dob = new DataOutputBuffer(); + ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, + dob.getLength()); + asContext.getAMContainerSpec().setTokens(securityTokens); + try { + appMonitor.submitApplication(asContext, "test"); + Assert.fail("Application submission should fail because" + + " Tokens are invalid."); + } catch (YarnException e) { + // Exception is expected + Assert.assertTrue("The thrown exception is not" + + " java.io.EOFException", + e.getMessage().contains("java.io.EOFException")); + } + int timeoutSecs = 0; + while ((getAppEventType() == RMAppEventType.KILL) && + timeoutSecs++ < 20) { + Thread.sleep(1000); + } + Assert.assertEquals("app event type sent is wrong", + RMAppEventType.APP_REJECTED, getAppEventType()); + asContext.getAMContainerSpec().setTokens(null); + } + + @Test + public void testRMAppSubmitWithValidTokens() throws Exception { + // Setup valid security tokens + DataOutputBuffer dob = new DataOutputBuffer(); + Credentials credentials = new Credentials(); + credentials.writeTokenStorageToStream(dob); + ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, + dob.getLength()); + asContext.getAMContainerSpec().setTokens(securityTokens); + appMonitor.submitApplication(asContext, "test"); + RMApp app = rmContext.getRMApps().get(appId); + Assert.assertNotNull("app is null", app); + Assert.assertEquals("app id doesn't match", appId, app.getApplicationId()); + Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState()); + verify(metricsPublisher).appACLsUpdated( + any(RMApp.class), any(String.class), anyLong()); + + // wait for event to be processed + int timeoutSecs = 0; + while ((getAppEventType() == RMAppEventType.KILL) && + timeoutSecs++ < 20) { + Thread.sleep(1000); + } + Assert.assertEquals("app event type sent is wrong", RMAppEventType.START, + getAppEventType()); + asContext.getAMContainerSpec().setTokens(null); + } + @Test (timeout = 30000) public void testRMAppSubmitMaxAppAttempts() throws Exception { int[] globalMaxAppAttempts = new int[] { 10, 1 };