diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index f0a6ddf..46ac73c 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -362,6 +362,8 @@ public static final String JOB_NAMENODES = "mapreduce.job.hdfs-servers"; + public static final String JOB_NAMENODES_TOKEN_RENEWAL_EXCLUDE = "mapreduce.job.hdfs-servers.token-renewal.exclude"; + public static final String JOB_JOBTRACKER_ID = "mapreduce.job.kerberos.jtprinicipal"; public static final String JOB_CANCEL_DELEGATION_TOKEN = "mapreduce.job.complete.cancel.delegation.tokens"; diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java index 7b1f657..5a572cf 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java @@ -101,6 +101,20 @@ static void obtainTokensForNamenodesInternal(Credentials credentials, } } + static boolean isTokenRenewalExcluded(FileSystem fs, Configuration conf) { + String [] nns = + conf.getStrings(MRJobConfig.JOB_NAMENODES_TOKEN_RENEWAL_EXCLUDE); + if (nns != null) { + String host = fs.getUri().getHost(); + for(int i=0; i< nns.length; i++) { + if (nns[i].equals(host)) { + return true; + } + } + } + return false; + } + /** * get delegation token for a specific FS * @param fs @@ -111,11 +125,16 @@ static void obtainTokensForNamenodesInternal(Credentials credentials, */ static void obtainTokensForNamenodesInternal(FileSystem fs, Credentials credentials, Configuration conf) throws IOException { - String delegTokenRenewer = Master.getMasterPrincipal(conf); - if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { - throw new IOException( - "Can't get Master Kerberos principal for use as renewer"); + // RM skips renewing token with empty renewer + String delegTokenRenewer = ""; + if (!isTokenRenewalExcluded(fs, conf)) { + delegTokenRenewer = Master.getMasterPrincipal(conf); + if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { + throw new IOException( + "Can't get Master Kerberos principal for use as renewer"); + } } + mergeBinaryTokens(credentials, conf); final Token tokens[] = fs.addDelegationTokens(delegTokenRenewer, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index cb456d8..2ad74d2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -78,7 +78,9 @@ private static final Log LOG = LogFactory.getLog(DelegationTokenRenewer.class); - + @VisibleForTesting + public static final Text HDFS_DELEGATION_KIND = + new Text("HDFS_DELEGATION_TOKEN"); public static final String SCHEME = "hdfs"; // global single timer (daemon) @@ -242,7 +244,7 @@ public DelegationTokenToRenew(ApplicationId jId, Token token, String user) { this.token = token; this.user = user; - if (token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) { + if (token.getKind().equals(HDFS_DELEGATION_KIND)) { try { AbstractDelegationTokenIdentifier identifier = (AbstractDelegationTokenIdentifier) token.decodeIdentifier(); @@ -409,7 +411,7 @@ private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt) boolean hasHdfsToken = false; for (Token token : tokens) { if (token.isManaged()) { - if (token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) { + if (token.getKind().equals(HDFS_DELEGATION_KIND)) { LOG.info(applicationId + " found existing hdfs token " + token); hasHdfsToken = true; } @@ -437,7 +439,9 @@ private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt) // renewal. for (DelegationTokenToRenew dtr : tokenList) { try { - renewToken(dtr); + if (!skipTokenRenewal(dtr)) { + renewToken(dtr); + } } catch (IOException ioe) { throw new IOException("Failed to renew token: " + dtr.token, ioe); } @@ -445,7 +449,9 @@ private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt) for (DelegationTokenToRenew dtr : tokenList) { appTokens.get(applicationId).add(dtr); allTokens.put(dtr.token, dtr); - setTimerForTokenRenewal(dtr); + if (!skipTokenRenewal(dtr)) { + setTimerForTokenRenewal(dtr); + } } } @@ -479,8 +485,10 @@ public void run() { requestNewHdfsDelegationTokenIfNeeded(dttr); // if the token is not replaced by a new token, renew the token if (appTokens.get(dttr.applicationId).contains(dttr)) { - renewToken(dttr); - setTimerForTokenRenewal(dttr);// set the next one + if (!skipTokenRenewal(dttr)) { + renewToken(dttr); + setTimerForTokenRenewal(dttr);// set the next one + } } else { LOG.info("The token was removed already. Token = [" +dttr +"]"); } @@ -496,14 +504,29 @@ public boolean cancel() { return super.cancel(); } } - + + /* + * Skip renewing token if the renewer of the token is set to + * Token.NON_RENEWER + */ + private boolean skipTokenRenewal(DelegationTokenToRenew token) + throws IOException { + Token tt = token.token; + if (tt.isManaged() && tt.getKind().equals(HDFS_DELEGATION_KIND)) { + @SuppressWarnings("unchecked") + Text renewer = ((Token)tt). + decodeIdentifier().getRenewer(); + return (renewer != null && renewer.toString().equals("")); + } + return false; + } + /** * set task to renew the token */ @VisibleForTesting protected void setTimerForTokenRenewal(DelegationTokenToRenew token) throws IOException { - // calculate timer time long expiresIn = token.expirationDate - System.currentTimeMillis(); long renewIn = token.expirationDate - expiresIn/10; // little bit before the expiration @@ -546,7 +569,7 @@ private void requestNewHdfsDelegationTokenIfNeeded( if (hasProxyUserPrivileges && dttr.maxDate - dttr.expirationDate < credentialsValidTimeRemaining - && dttr.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) { + && dttr.token.getKind().equals(HDFS_DELEGATION_KIND)) { // remove all old expiring hdfs tokens for this application. Set tokenSet = appTokens.get(dttr.applicationId); @@ -555,7 +578,7 @@ private void requestNewHdfsDelegationTokenIfNeeded( synchronized (tokenSet) { while (iter.hasNext()) { DelegationTokenToRenew t = iter.next(); - if (t.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) { + if (t.token.getKind().equals(HDFS_DELEGATION_KIND)) { iter.remove(); if (t.timerTask != null) { t.timerTask.cancel(); @@ -592,8 +615,10 @@ private void requestNewHdfsDelegationToken(ApplicationId applicationId, new DelegationTokenToRenew(applicationId, token, getConfig(), Time.now(), shouldCancelAtEnd, user); // renew the token to get the next expiration date. - renewToken(tokenToRenew); - setTimerForTokenRenewal(tokenToRenew); + if (!skipTokenRenewal(tokenToRenew)) { + renewToken(tokenToRenew); + setTimerForTokenRenewal(tokenToRenew); + } appTokens.get(applicationId).add(tokenToRenew); LOG.info("Received new token " + token); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 5d31404..0fba18f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -109,7 +109,8 @@ public class TestDelegationTokenRenewer { private static final Log LOG = LogFactory.getLog(TestDelegationTokenRenewer.class); - private static final Text KIND = new Text("HDFS_DELEGATION_TOKEN"); + private static final Text KIND = + DelegationTokenRenewer.HDFS_DELEGATION_KIND; private static BlockingQueue eventQueue; private static volatile AtomicInteger counter; @@ -467,7 +468,26 @@ public void testAppRejectionWithCancelledDelegationToken() throws Exception { } fail("App submission with a cancelled token should have failed"); } - + + // Testcase for YARN-3021, let RM skip renewing token if the renewer string + // is empty + @Test(timeout=60000) + public void testAppTokenWithNonRenewer() throws Exception { + MyFS dfs = (MyFS)FileSystem.get(conf); + LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode()); + + // Test would fail if using non-empty renewer string here + MyToken token = dfs.getDelegationToken(""); + token.cancelToken(); + + Credentials ts = new Credentials(); + ts.addToken(token.getKind(), token); + + // register the tokens for renewal + ApplicationId appId = BuilderUtils.newApplicationId(0, 0); + delegationTokenRenewer.addApplicationSync(appId, ts, true, "user"); + } + /** * Basic idea of the test: * 1. register a token for 2 seconds with no cancel at the end @@ -708,7 +728,7 @@ public void testDTRonAppSubmission() throws IOException, InterruptedException, BrokenBarrierException { final Credentials credsx = new Credentials(); final Token tokenx = mock(Token.class); - when(tokenx.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN")); + when(tokenx.getKind()).thenReturn(KIND); DelegationTokenIdentifier dtId1 = new DelegationTokenIdentifier(new Text("user1"), new Text("renewer"), new Text("user1")); @@ -752,7 +772,7 @@ public void testConcurrentAddApplication() // this token uses barriers to block during renew final Credentials creds1 = new Credentials(); final Token token1 = mock(Token.class); - when(token1.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN")); + when(token1.getKind()).thenReturn(KIND); DelegationTokenIdentifier dtId1 = new DelegationTokenIdentifier(new Text("user1"), new Text("renewer"), new Text("user1")); @@ -770,7 +790,7 @@ public Long answer(InvocationOnMock invocation) // this dummy token fakes renewing final Credentials creds2 = new Credentials(); final Token token2 = mock(Token.class); - when(token2.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN")); + when(token2.getKind()).thenReturn(KIND); when(token2.decodeIdentifier()).thenReturn(dtId1); creds2.addToken(new Text("token"), token2); doReturn(true).when(token2).isManaged();