diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 1314bf9e67b..c630437d12c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -727,6 +728,21 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_RM_DELEGATION_TOKEN_MAX_CONF_SIZE_BYTES = 12800; + public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT = + RM_PREFIX + "delegation-token-renewer.thread-timeout"; + public static final long DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT = + TimeUnit.SECONDS.toMillis(60); // 60 Seconds + + public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL = + RM_PREFIX + "delegation-token-renewer.thread-retry-interval"; + public static final long DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL = + TimeUnit.SECONDS.toMillis(60); // 60 Seconds + + public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS = + RM_PREFIX + "delegation-token-renewer.thread-retry-max-attempts"; + public static final int DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS = + 10; + public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled"; public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index d3ed5032363..3d599c194fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -36,10 +36,12 @@ import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -115,6 +117,9 @@ private boolean tokenKeepAliveEnabled; private boolean hasProxyUserPrivileges; private long credentialsValidTimeRemaining; + private long tokenRenewerThreadTimeout; + private long tokenRenewerThreadRetryInterval; + private int tokenRenewerThreadRetryMaxAttempts; // this config is supposedly not used by end-users. public static final String RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING = @@ -140,6 +145,17 @@ protected void serviceInit(Configuration conf) throws Exception { this.credentialsValidTimeRemaining = conf.getLong(RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING, DEFAULT_RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING); + tokenRenewerThreadTimeout = conf.getTimeDuration( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT, + TimeUnit.MILLISECONDS); + tokenRenewerThreadRetryInterval = conf.getTimeDuration( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL, + TimeUnit.MILLISECONDS); + tokenRenewerThreadRetryMaxAttempts = conf.getInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS); setLocalSecretManagerAndServiceAddr(); renewerService = createNewThreadPoolService(conf); pendingEventQueue = new LinkedBlockingQueue(); @@ -193,12 +209,54 @@ protected void serviceStart() throws Exception { private void processDelegationTokenRenewerEvent( DelegationTokenRenewerEvent evt) { serviceStateLock.readLock().lock(); + Future future = null; try { if (isServiceStarted) { - renewerService.execute(new DelegationTokenRenewerRunnable(evt)); + future = renewerService.submit(new DelegationTokenRenewerRunnable(evt)); + future.get(tokenRenewerThreadTimeout, TimeUnit.MILLISECONDS); } else { pendingEventQueue.add(evt); } + } catch (TimeoutException e) { + // Cancel thread and retry the same event in case of timeout + if (future != null && !future.isDone() && !future.isCancelled()) { + future.cancel(true); + if (evt.getAttempt() < tokenRenewerThreadRetryMaxAttempts) { + renewalTimer.schedule(new TimerTask() { + @Override + public void run() { + LOG.info( + "Retrying token renewer thread for appid = {} and attempt is {}", + evt.getApplicationId(), evt.getAttempt()); + evt.incrAttempt(); + + AbstractDelegationTokenRenewerAppEvent recoverEvt = + (AbstractDelegationTokenRenewerAppEvent) evt; + Collection> tokens = + recoverEvt.getCredentials().getAllTokens(); + for (Token token : tokens) { + DelegationTokenToRenew dttr = allTokens.get(token); + removeFailedDelegationToken(dttr); + } + + DelegationTokenRenewerAppRecoverEvent event = + new DelegationTokenRenewerAppRecoverEvent( + recoverEvt.getApplicationId(), + recoverEvt.getCredentials(), + recoverEvt.shouldCancelAtEnd(), recoverEvt.getUser(), + recoverEvt.getTokenConf()); + event.setAttempt(evt.getAttempt()); + processDelegationTokenRenewerEvent(event); + } + }, tokenRenewerThreadRetryInterval); + } else { + LOG.info( + "Exhausted max retry attempts {} in token renewer thread. appid= {}", + tokenRenewerThreadRetryMaxAttempts, evt.getApplicationId()); + } + } + } catch (Exception e) { + LOG.info("Problem in submitting renew tasks in token renewer thread.", e); } finally { serviceStateLock.readLock().unlock(); } @@ -476,7 +534,8 @@ private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt) for (Iterator> itor = tokenConf.iterator(); itor.hasNext(); ) { Map.Entry entry = itor.next(); - LOG.info(entry.getKey() + " ===> " + entry.getValue()); + LOG.debug("Token conf key is {} and value is {}", + entry.getKey(), entry.getValue()); } } } else { @@ -1016,6 +1075,10 @@ public boolean shouldCancelAtEnd() { public String getUser() { return user; } + + public Configuration getTokenConf() { + return tokenConf; + } } enum DelegationTokenRenewerEventType { @@ -1028,6 +1091,7 @@ public String getUser() { AbstractEvent { private ApplicationId appId; + private int attempt = 1; public DelegationTokenRenewerEvent(ApplicationId appId, DelegationTokenRenewerEventType type) { @@ -1038,6 +1102,18 @@ public DelegationTokenRenewerEvent(ApplicationId appId, public ApplicationId getApplicationId() { return appId; } + + public void incrAttempt() { + attempt++; + } + + public int getAttempt() { + return attempt; + } + + public void setAttempt(int attempt) { + this.attempt = attempt; + } } // only for testing diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index ee104b41336..d72ce4f5142 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; @@ -42,6 +43,7 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -164,7 +166,7 @@ public long renew(Token t, Configuration conf) throws IOException { throw new InvalidToken("token has been canceled"); } lastRenewed = token; - counter ++; + counter++; LOG.info("Called MYDFS.renewdelegationtoken " + token + ";this dfs=" + this.hashCode() + ";c=" + counter); if(tokenToRenewIn2Sec == token) { @@ -1534,4 +1536,154 @@ public void testTokenSequenceNoAfterNewTokenAndRenewal() throws Exception { // Ensure incrTokenSequenceNo has been called for token renewal as well. Mockito.verify(mockContext, Mockito.times(2)).incrTokenSequenceNo(); } + + @Test(timeout = 60000) + public void testTokenThreadTimeout() throws Exception { + Configuration yarnConf = new YarnConfiguration(); + yarnConf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, + true); + yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + yarnConf.set(YarnConfiguration.RM_STORE, + MemoryRMStateStore.class.getName()); + yarnConf.setInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT, 10); + yarnConf.setInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL, + 10); + yarnConf.setInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS, + 3); + UserGroupInformation.setConfiguration(yarnConf); + + Text userText1 = new Text("user1"); + DelegationTokenIdentifier dtId1 = new DelegationTokenIdentifier(userText1, + new Text("renewer1"), userText1); + final Token originalToken = + new Token<>(dtId1.getBytes(), "password1".getBytes(), dtId1.getKind(), + new Text("service1")); + + Credentials credentials = new Credentials(); + credentials.addToken(userText1, originalToken); + + AtomicBoolean renewDelay = new AtomicBoolean(false); + AtomicInteger counter = new AtomicInteger(0); + renewDelay.set(true); + DelegationTokenRenewer renewer = createNewDelegationTokenRenewerForTimeout( + yarnConf, counter, renewDelay); + + MockRM rm1 = + new TestSecurityMockRM(yarnConf) { + @Override + protected DelegationTokenRenewer createDelegationTokenRenewer() { + return renewer; + } + }; + + rm1.start(); + rm1.submitApp(200, "name", "user", + new HashMap(), false, "default", 1, + credentials); + + Thread.sleep(40000); + + // Ensure no. of threads has been used in renewer service thread pool is + // higher than the configured max retry attempts + assertTrue(counter.get() >= yarnConf.getInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS)); + rm1.close(); + } + + @Test(timeout = 120000) + public void testTokenThreadTimeoutWithoutDelay() throws Exception { + Configuration yarnConf = new YarnConfiguration(); + yarnConf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, + true); + yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + yarnConf.set(YarnConfiguration.RM_STORE, + MemoryRMStateStore.class.getName()); + yarnConf.setInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT, 60); + yarnConf.setInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL, + 60); + yarnConf.setInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS, + 3); + UserGroupInformation.setConfiguration(yarnConf); + + Text userText1 = new Text("user1"); + DelegationTokenIdentifier dtId1 = new DelegationTokenIdentifier(userText1, + new Text("renewer1"), userText1); + final Token originalToken = + new Token<>(dtId1.getBytes(), "password1".getBytes(), dtId1.getKind(), + new Text("service1")); + + Credentials credentials = new Credentials(); + credentials.addToken(userText1, originalToken); + + AtomicBoolean renewDelay = new AtomicBoolean(false); + AtomicInteger counter = new AtomicInteger(0); + DelegationTokenRenewer renwer = createNewDelegationTokenRenewerForTimeout( + yarnConf, counter, renewDelay); + + MockRM rm1 = + new TestSecurityMockRM(yarnConf) { + @Override + protected DelegationTokenRenewer createDelegationTokenRenewer() { + return renwer; + } + }; + + rm1.start(); + rm1.submitApp(200, "name", "user", + new HashMap(), false, "default", 1, + credentials); + + Thread.sleep(60000); + + // Ensure only one thread has been used in renewer service thread pool + assertTrue(counter.get() == 1); + rm1.close(); + } + + private DelegationTokenRenewer createNewDelegationTokenRenewerForTimeout( + Configuration conf, final AtomicInteger counter, + final AtomicBoolean renewDelay) { + DelegationTokenRenewer renew = new DelegationTokenRenewer() { + @Override + protected ThreadPoolExecutor createNewThreadPoolService( + Configuration conf) { + ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 5, 3L, + TimeUnit.SECONDS, new LinkedBlockingQueue()) { + @Override + public Future submit(Runnable r) { + counter.incrementAndGet(); + return super.submit(r); + } + }; + return pool; + } + + @Override + protected void renewToken(final DelegationTokenToRenew dttr) + throws IOException { + try { + if (renewDelay.get()) { + // Delay for 4 times than the configured timeout + Thread.sleep(conf.getTimeDuration( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT, + TimeUnit.MILLISECONDS) * 4); + } + super.renewToken(dttr); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }; + return renew; + } }