diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index fd8935debbc..06756f065aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -525,9 +525,10 @@ private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt) requestNewHdfsDelegationTokenAsProxyUser( Arrays.asList(applicationId), evt.getUser(), evt.shouldCancelAtEnd()); - continue; + } else { + LOG.error("Failed to renew token: " + dttr.token, ioe); } - throw new IOException("Failed to renew token: " + dttr.token, ioe); + continue; } } tokenList.add(dttr); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 31a87cb71be..307edbfe137 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -39,6 +39,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentHashMap; @@ -1156,6 +1157,145 @@ protected void renewToken(final DelegationTokenToRenew dttr) Assert.assertTrue(appCredentials.getAllTokens().contains(updatedToken)); } + private Token getNonHDFSTokenToBeFirst( + Text userText, Token token) { + Random rand = new Random(); + while (true) { + // create non-hdfs token + DelegationTokenIdentifier nonHdfsDTId = new DelegationTokenIdentifier( + userText, new Text("renewer1"), userText); + Text service = new Text("testing" + rand.nextInt()); + Token nonHdfsToken = + new Token<>(nonHdfsDTId.getBytes(), "password1".getBytes(), + new Text("TESTING-TOKEN-KIND"), service); + // we should guarantee non-hdfs token is first. if not, retry it. + Credentials credentials = new Credentials(); + credentials.addToken(token.getService(), token); + credentials.addToken(nonHdfsToken.getService(), nonHdfsToken); + if (credentials.getAllTokens().iterator().next().getService().equals( + service)) { + return nonHdfsToken; + } + } + } + + // 1. tokens including HDFS DT and non-HDFS DT are expired before app completes + // 2. RM shutdown. + // 3. When RM recovers the app, even though renewal for non-HDFS token is + // failed first, RM should request a new token for HDFS token and sent it + // to NM for log-aggregation. + @Test + public void testRMRestartWithMultipleExpiredTokens() throws Exception { + Configuration yarnConf = new YarnConfiguration(); + yarnConf + .setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true); + yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + yarnConf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + yarnConf + .set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + UserGroupInformation.setConfiguration(yarnConf); + + // create Token1 + Text userText1 = new Text("user1"); + DelegationTokenIdentifier dtId1 = new DelegationTokenIdentifier(userText1, + new Text("renewer1"), userText1); + final Token originalToken = + new Token<>(dtId1.getBytes(), "password2".getBytes(), dtId1.getKind(), + new Text("service1")); + + // put non-HDFS token in front + final Token nonHdfsToken = + getNonHDFSTokenToBeFirst(userText1, originalToken); + Credentials credentials = new Credentials(); + credentials.addToken(nonHdfsToken.getService(), nonHdfsToken); + credentials.addToken(originalToken.getService(), originalToken); + + MockRM rm1 = new TestSecurityMockRM(yarnConf); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); + rm1.start(); + RMApp app = MockRMAppSubmitter.submit(rm1, + MockRMAppSubmissionData.Builder.createWithMemory(200, rm1) + .withAppName("name") + .withUser("user") + .withAcls(new HashMap()) + .withUnmanagedAM(false) + .withQueue("default") + .withMaxAppAttempts(1) + .withCredentials(credentials) + .build()); + + // create token2 + Text userText2 = new Text("user1"); + DelegationTokenIdentifier dtId2 = + new DelegationTokenIdentifier(userText1, new Text("renewer2"), + userText2); + final Token updatedToken = + new Token(dtId2.getBytes(), + "password2".getBytes(), dtId2.getKind(), new Text("service2")); + AtomicBoolean firstRenewInvoked = new AtomicBoolean(false); + AtomicBoolean secondRenewInvoked = new AtomicBoolean(false); + AtomicBoolean nonHdfsTokenRenewInvoked = new AtomicBoolean(false); + MockRM rm2 = new TestSecurityMockRM(yarnConf, memStore) { + @Override + protected DelegationTokenRenewer createDelegationTokenRenewer() { + return new DelegationTokenRenewer() { + + @Override + protected void renewToken(final DelegationTokenToRenew dttr) + throws IOException { + if (dttr.token.equals(updatedToken)) { + secondRenewInvoked.set(true); + super.renewToken(dttr); + } else if (dttr.token.equals(originalToken)) { + firstRenewInvoked.set(true); + throw new InvalidToken("Failed to renew"); + } else if (dttr.token.equals(nonHdfsToken)) { + nonHdfsTokenRenewInvoked.set(true); + throw new InvalidToken("Failed to renew for non-HDFS token"); + } + else { + throw new IOException("Unexpected"); + } + } + + @Override + protected Token[] obtainSystemTokensForUser(String user, + final Credentials credentials) throws IOException { + credentials.addToken(updatedToken.getService(), updatedToken); + return new Token[] { updatedToken }; + } + }; + } + }; + + // simulating restart the rm + rm2.start(); + + // check nm can retrieve the token + final MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); + nm1.registerNode(); + NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); + + NodeHeartbeatResponse proto = new NodeHeartbeatResponsePBImpl( + ((NodeHeartbeatResponsePBImpl) response).getProto()); + + ByteBuffer tokenBuffer = + YarnServerBuilderUtils + .convertFromProtoFormat(proto.getSystemCredentialsForApps()) + .get(app.getApplicationId()); + Assert.assertNotNull(tokenBuffer); + Credentials appCredentials = new Credentials(); + DataInputByteBuffer buf = new DataInputByteBuffer(); + tokenBuffer.rewind(); + buf.reset(tokenBuffer); + appCredentials.readTokenStorageStream(buf); + Assert.assertTrue(nonHdfsTokenRenewInvoked.get() && firstRenewInvoked.get() + && secondRenewInvoked.get()); + Assert.assertTrue(appCredentials.getAllTokens().contains(updatedToken)); + } + // YARN will get the token for the app submitted without the delegation token. @Test public void testAppSubmissionWithoutDelegationToken() throws Exception {