diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 1e55fe383b0..d3b08961eae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -709,6 +709,21 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_RM_DELEGATION_TOKEN_MAX_CONF_SIZE_BYTES = 12800; + public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT = + RM_PREFIX + "delegation-token-renewer.thread-timeout"; + public static final int DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT = + 60; // 60 Seconds + + public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL = + RM_PREFIX + "delegation-token-renewer.thread-retry-interval"; + public static final int DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL = + 60; // 60 Seconds + + public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS = + RM_PREFIX + "delegation-token-renewer.thread-retry-max-attempts"; + public static final int DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS = + 10; + public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled"; public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index d3ed5032363..a5b4b8235b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -36,10 +36,12 @@ import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -115,6 +117,9 @@ private boolean tokenKeepAliveEnabled; private boolean hasProxyUserPrivileges; private long credentialsValidTimeRemaining; + private long tokenRenewerThreadTimeout; + private long tokenRenewerThreadRetryInterval; + private int tokenRenewerThreadRetryMaxAttempts; // this config is supposedly not used by end-users. public static final String RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING = @@ -140,6 +145,20 @@ protected void serviceInit(Configuration conf) throws Exception { this.credentialsValidTimeRemaining = conf.getLong(RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING, DEFAULT_RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING); + tokenRenewerThreadTimeout = + conf.getTimeDuration( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT, + TimeUnit.MILLISECONDS); + tokenRenewerThreadRetryInterval = + conf.getTimeDuration( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL, + TimeUnit.MILLISECONDS); + tokenRenewerThreadRetryMaxAttempts = + conf.getInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS); setLocalSecretManagerAndServiceAddr(); renewerService = createNewThreadPoolService(conf); pendingEventQueue = new LinkedBlockingQueue(); @@ -193,12 +212,66 @@ protected void serviceStart() throws Exception { private void processDelegationTokenRenewerEvent( DelegationTokenRenewerEvent evt) { serviceStateLock.readLock().lock(); + Future future = + null; try { if (isServiceStarted) { - renewerService.execute(new DelegationTokenRenewerRunnable(evt)); + future = + renewerService.submit(new DelegationTokenRenewerRunnable(evt)); + if (tokenRenewerThreadTimeout > 0) { + future.get(tokenRenewerThreadTimeout, TimeUnit.MILLISECONDS); + } } else { pendingEventQueue.add(evt); } + } catch (Exception e) { + if (e instanceof TimeoutException) { + + // Cancel thread and retry the same event in case of timeout + if (future != null && !future.isDone() && !future.isCancelled()) { + future.cancel(true); + if (evt.getAttempt() < tokenRenewerThreadRetryMaxAttempts) { + renewalTimer.schedule(new TimerTask() { + @Override + public void run() { + + LOG.info( + "Retrying token renewer thread for appid = {} and attempt is {}", + evt.getApplicationId(), evt.getAttempt()); + + evt.incrAttempt(); + + AbstractDelegationTokenRenewerAppEvent recoverEvt = + (AbstractDelegationTokenRenewerAppEvent) evt; + + Collection> tokens = + recoverEvt.getCredentials().getAllTokens(); + for (Token token : tokens) { + DelegationTokenToRenew dttr = + allTokens.get(token); + removeFailedDelegationToken(dttr); + } + + DelegationTokenRenewerAppRecoverEvent event = + new DelegationTokenRenewerAppRecoverEvent( + recoverEvt.getApplicationId(), + recoverEvt.getCredentials(), + recoverEvt.shouldCancelAtEnd(), recoverEvt.getUser(), + recoverEvt.getTokenConf()); + event.setAttempt(evt.getAttempt()); + processDelegationTokenRenewerEvent(event); + } + }, tokenRenewerThreadRetryInterval); + } else { + LOG.info( + "Exhausted max retry attempts {} in token renewer thread. appid= {}", + tokenRenewerThreadRetryMaxAttempts, evt.getApplicationId()); + } + } + } else { + LOG.info("Problem in submitting renew tasks in token renewer thread.", + e); + } } finally { serviceStateLock.readLock().unlock(); } @@ -439,7 +512,6 @@ private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt) } LOG.debug("Registering tokens for renewal for: appId = {}", applicationId); - Collection> tokens = ts.getAllTokens(); long now = System.currentTimeMillis(); @@ -476,7 +548,7 @@ private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt) for (Iterator> itor = tokenConf.iterator(); itor.hasNext(); ) { Map.Entry entry = itor.next(); - LOG.info(entry.getKey() + " ===> " + entry.getValue()); + LOG.debug(entry.getKey() + " ===> " + entry.getValue()); } } } else { @@ -505,7 +577,6 @@ private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt) tokenList.add(dttr); } } - if (!tokenList.isEmpty()) { // Renewing token and adding it to timer calls are separated purposefully // If user provides incorrect token then it should not be added for @@ -617,6 +688,7 @@ protected void setTimerForTokenRenewal(DelegationTokenToRenew token) @VisibleForTesting protected void renewToken(final DelegationTokenToRenew dttr) throws IOException { + // need to use doAs so that http can find the kerberos tgt // NOTE: token renewers should be responsible for the correct UGI! try { @@ -1016,6 +1088,10 @@ public boolean shouldCancelAtEnd() { public String getUser() { return user; } + + public Configuration getTokenConf() { + return tokenConf; + } } enum DelegationTokenRenewerEventType { @@ -1028,6 +1104,8 @@ public String getUser() { AbstractEvent { private ApplicationId appId; + private int attempt = + 1; public DelegationTokenRenewerEvent(ApplicationId appId, DelegationTokenRenewerEventType type) { @@ -1038,10 +1116,23 @@ public DelegationTokenRenewerEvent(ApplicationId appId, public ApplicationId getApplicationId() { return appId; } + + public void incrAttempt() { + attempt++; + } + + public int getAttempt() { + return attempt; + } + + public void setAttempt(int attempt) { + this.attempt = + attempt; + } } // only for testing protected ConcurrentMap, DelegationTokenToRenew> getAllTokens() { return allTokens; } -} +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index d973dca3c73..f678321464b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; @@ -42,6 +43,7 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -164,7 +166,7 @@ public long renew(Token t, Configuration conf) throws IOException { throw new InvalidToken("token has been canceled"); } lastRenewed = token; - counter ++; + counter++; LOG.info("Called MYDFS.renewdelegationtoken " + token + ";this dfs=" + this.hashCode() + ";c=" + counter); if(tokenToRenewIn2Sec == token) { @@ -211,6 +213,8 @@ public void setUp() throws Exception { counter = new AtomicInteger(0); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + conf.setInt(YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT, + -1); conf.set("override_token_expire_time", "3000"); UserGroupInformation.setConfiguration(conf); eventQueue = new LinkedBlockingQueue(); @@ -1530,4 +1534,168 @@ public void testTokenSequenceNoAfterNewTokenAndRenewal() throws Exception { // Ensure incrTokenSequenceNo has been called for token renewal as well. Mockito.verify(mockContext, Mockito.times(2)).incrTokenSequenceNo(); } + + @Test(timeout = 60000) + public void testTokenThreadTimeout() throws Exception { + Configuration yarnConf = + new YarnConfiguration(); + yarnConf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, + true); + yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + yarnConf.set(YarnConfiguration.RM_STORE, + MemoryRMStateStore.class.getName()); + yarnConf.setInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT, 10); + yarnConf.setInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL, + 10); + yarnConf.setInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS, + 3); + UserGroupInformation.setConfiguration(yarnConf); + + Text userText1 = + new Text("user1"); + DelegationTokenIdentifier dtId1 = + new DelegationTokenIdentifier(userText1, new Text("renewer1"), userText1); + final Token originalToken = + new Token<>(dtId1.getBytes(), "password1".getBytes(), dtId1.getKind(), + new Text("service1")); + + Credentials credentials = + new Credentials(); + credentials.addToken(userText1, originalToken); + + AtomicBoolean renewDelay = + new AtomicBoolean(false); + AtomicInteger counter = + new AtomicInteger(0); + renewDelay.set(true); + DelegationTokenRenewer renewer = + createNewDelegationTokenRenewerForTimeout(yarnConf, counter, renewDelay); + + MockRM rm1 = + new TestSecurityMockRM(yarnConf) { + @Override + protected DelegationTokenRenewer createDelegationTokenRenewer() { + return renewer; + } + }; + + rm1.start(); + rm1.submitApp(200, "name", "user", + new HashMap(), false, "default", 1, + credentials); + + Thread.sleep(40000); + + System.out.println("counter is " + counter.get()); + // Ensure no. of threads has been used in renewer service thread pool is + // higher than the configured max retry attempts + assertTrue(counter.get() >= yarnConf.getInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS)); + rm1.close(); + } + + @Test(timeout = 120000) + public void testTokenThreadTimeoutWithoutDelay() throws Exception { + Configuration yarnConf = + new YarnConfiguration(); + yarnConf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, + true); + yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + yarnConf.set(YarnConfiguration.RM_STORE, + MemoryRMStateStore.class.getName()); + yarnConf.setInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT, 60); + yarnConf.setInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL, + 60); + yarnConf.setInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS, + 3); + UserGroupInformation.setConfiguration(yarnConf); + + Text userText1 = + new Text("user1"); + DelegationTokenIdentifier dtId1 = + new DelegationTokenIdentifier(userText1, new Text("renewer1"), userText1); + final Token originalToken = + new Token<>(dtId1.getBytes(), "password1".getBytes(), dtId1.getKind(), + new Text("service1")); + + Credentials credentials = + new Credentials(); + credentials.addToken(userText1, originalToken); + + AtomicBoolean renewDelay = + new AtomicBoolean(false); + AtomicInteger counter = + new AtomicInteger(0); + DelegationTokenRenewer renwer = + createNewDelegationTokenRenewerForTimeout(yarnConf, counter, renewDelay); + + MockRM rm1 = + new TestSecurityMockRM(yarnConf) { + @Override + protected DelegationTokenRenewer createDelegationTokenRenewer() { + return renwer; + } + }; + + rm1.start(); + rm1.submitApp(200, "name", "user", + new HashMap(), false, "default", 1, + credentials); + + Thread.sleep(60000); + + // Ensure only one thread has been used in renewer service thread pool + assertTrue(counter.get() == 1); + rm1.close(); + } + + private DelegationTokenRenewer createNewDelegationTokenRenewerForTimeout( + Configuration conf, final AtomicInteger counter, + final AtomicBoolean renewDelay) { + DelegationTokenRenewer renew = + new DelegationTokenRenewer() { + + @Override + protected ThreadPoolExecutor + createNewThreadPoolService(Configuration conf) { + ThreadPoolExecutor pool = + new ThreadPoolExecutor(5, 5, 3L, TimeUnit.SECONDS, + new LinkedBlockingQueue()) { + @Override + public Future submit(Runnable r) { + counter.incrementAndGet(); + return super.submit(r); + } + }; + return pool; + } + + @Override + protected void renewToken(final DelegationTokenToRenew dttr) + throws IOException { + try { + if (renewDelay.get()) { + // Delay for 4 times than the configured timeout + Thread.sleep(conf.getTimeDuration( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT, + TimeUnit.MILLISECONDS) * 4); + } + super.renewToken(dttr); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }; + return renew; + } }