diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 49d2ff05801..6495bcdfcf1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -729,6 +730,19 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_RM_DELEGATION_TOKEN_MAX_CONF_SIZE_BYTES = 12800; + public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT = + RM_PREFIX + "delegation-token-renewer.thread-timeout"; + public static final long DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT = + TimeUnit.SECONDS.toMillis(60); // 60 Seconds + public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL = + RM_PREFIX + "delegation-token-renewer.thread-retry-interval"; + public static final long DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL = + TimeUnit.SECONDS.toMillis(60); // 60 Seconds + public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS = + RM_PREFIX + "delegation-token-renewer.thread-retry-max-attempts"; + public static final int DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS = + 10; + public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled"; public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index d3ed5032363..4cda065da37 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -36,16 +37,16 @@ import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; @@ -70,6 +71,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -115,6 +118,12 @@ private boolean tokenKeepAliveEnabled; private boolean hasProxyUserPrivileges; private long credentialsValidTimeRemaining; + private long tokenRenewerThreadTimeout; + private long tokenRenewerThreadRetryInterval; + private int tokenRenewerThreadRetryMaxAttempts; + private final Map> futures = + new HashMap>(); + private boolean delegationTokenRenewerPoolTrackerFlag = true; // this config is supposedly not used by end-users. public static final String RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING = @@ -140,6 +149,17 @@ protected void serviceInit(Configuration conf) throws Exception { this.credentialsValidTimeRemaining = conf.getLong(RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING, DEFAULT_RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING); + tokenRenewerThreadTimeout = conf.getTimeDuration( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT, + TimeUnit.MILLISECONDS); + tokenRenewerThreadRetryInterval = conf.getTimeDuration( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL, + TimeUnit.MILLISECONDS); + tokenRenewerThreadRetryMaxAttempts = conf.getInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS); setLocalSecretManagerAndServiceAddr(); renewerService = createNewThreadPoolService(conf); pendingEventQueue = new LinkedBlockingQueue(); @@ -184,6 +204,11 @@ protected void serviceStart() throws Exception { serviceStateLock.writeLock().lock(); isServiceStarted = true; serviceStateLock.writeLock().unlock(); + + if (delegationTokenRenewerPoolTrackerFlag) { + renewerService.submit(new DelegationTokenRenewerPoolTracker()); + } + while(!pendingEventQueue.isEmpty()) { processDelegationTokenRenewerEvent(pendingEventQueue.take()); } @@ -195,7 +220,9 @@ private void processDelegationTokenRenewerEvent( serviceStateLock.readLock().lock(); try { if (isServiceStarted) { - renewerService.execute(new DelegationTokenRenewerRunnable(evt)); + Future future = + renewerService.submit(new DelegationTokenRenewerRunnable(evt)); + futures.put(evt, future); } else { pendingEventQueue.add(evt); } @@ -476,7 +503,8 @@ private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt) for (Iterator> itor = tokenConf.iterator(); itor.hasNext(); ) { Map.Entry entry = itor.next(); - LOG.info(entry.getKey() + " ===> " + entry.getValue()); + LOG.debug("Token conf key is {} and value is {}", + entry.getKey(), entry.getValue()); } } } else { @@ -894,7 +922,90 @@ public void run() { public void setRMContext(RMContext rmContext) { this.rmContext = rmContext; } - + + @VisibleForTesting + public void setDelegationTokenRenewerPoolTracker(boolean flag) { + delegationTokenRenewerPoolTrackerFlag = flag; + } + + private TimerTask getTimerTask(AbstractDelegationTokenRenewerAppEvent evt) { + return new TimerTask() { + @Override + public void run() { + LOG.info("Retrying token renewer thread for appid = {} and " + + "attempt is {}", evt.getApplicationId(), + evt.getAttempt()); + evt.incrAttempt(); + + Collection> tokens = + evt.getCredentials().getAllTokens(); + for (Token token : tokens) { + DelegationTokenToRenew dttr = allTokens.get(token); + if (dttr != null) { + removeFailedDelegationToken(dttr); + } + } + + DelegationTokenRenewerAppRecoverEvent event = + new DelegationTokenRenewerAppRecoverEvent( + evt.getApplicationId(), evt.getCredentials(), + evt.shouldCancelAtEnd(), evt.getUser(), evt.getTokenConf()); + event.setAttempt(evt.getAttempt()); + processDelegationTokenRenewerEvent(event); + } + }; + } + + private final class DelegationTokenRenewerPoolTracker + implements Runnable { + + public DelegationTokenRenewerPoolTracker() { + } + + /** + * Keep traversing of renewer pool threads and wait for specific + * timeout. In case of timeout exception, retry the event till no. of + * attempts reaches max attempts with specific interval. + */ + @Override + public void run() { + + while (true) { + + for (Map.Entry> entry : futures + .entrySet()) { + + DelegationTokenRenewerEvent evt = entry.getKey(); + Future future = entry.getValue(); + + try { + future.get(tokenRenewerThreadTimeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + + // Cancel thread and retry the same event in case of timeout + if (future != null && !future.isDone() && !future.isCancelled()) { + future.cancel(true); + futures.remove(evt); + if (evt.getAttempt() < tokenRenewerThreadRetryMaxAttempts) { + renewalTimer.schedule( + getTimerTask((AbstractDelegationTokenRenewerAppEvent) evt), + tokenRenewerThreadRetryInterval); + } else { + LOG.info( + "Exhausted max retry attempts {} in token renewer " + + "thread for {}", + tokenRenewerThreadRetryMaxAttempts, evt.getApplicationId()); + } + } + } catch (Exception e) { + LOG.info("Problem in submitting renew tasks in token renewer " + + "thread.", e); + } + } + } + } + } + /* * This will run as a separate thread and will process individual events. It * is done in this way to make sure that the token renewal as a part of @@ -1016,6 +1127,10 @@ public boolean shouldCancelAtEnd() { public String getUser() { return user; } + + private Configuration getTokenConf() { + return tokenConf; + } } enum DelegationTokenRenewerEventType { @@ -1028,6 +1143,7 @@ public String getUser() { AbstractEvent { private ApplicationId appId; + private int attempt = 1; public DelegationTokenRenewerEvent(ApplicationId appId, DelegationTokenRenewerEventType type) { @@ -1038,6 +1154,18 @@ public DelegationTokenRenewerEvent(ApplicationId appId, public ApplicationId getApplicationId() { return appId; } + + public void incrAttempt() { + attempt++; + } + + public int getAttempt() { + return attempt; + } + + public void setAttempt(int attempt) { + this.attempt = attempt; + } } // only for testing diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index ee104b41336..2f99be1a1cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; @@ -42,6 +43,7 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -91,6 +93,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -228,6 +231,7 @@ public void setUp() throws Exception { InetSocketAddress sockAddr = InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); + delegationTokenRenewer.setDelegationTokenRenewerPoolTracker(false); delegationTokenRenewer.setRMContext(mockContext); delegationTokenRenewer.init(conf); delegationTokenRenewer.start(); @@ -601,6 +605,7 @@ public void testDTKeepAlive1 () throws Exception { InetSocketAddress sockAddr = InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); + localDtr.setDelegationTokenRenewerPoolTracker(false); localDtr.setRMContext(mockContext); localDtr.init(lconf); localDtr.start(); @@ -681,6 +686,7 @@ public void testDTKeepAlive2() throws Exception { InetSocketAddress sockAddr = InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); + localDtr.setDelegationTokenRenewerPoolTracker(false); localDtr.setRMContext(mockContext); localDtr.init(lconf); localDtr.start(); @@ -1534,4 +1540,167 @@ public void testTokenSequenceNoAfterNewTokenAndRenewal() throws Exception { // Ensure incrTokenSequenceNo has been called for token renewal as well. Mockito.verify(mockContext, Mockito.times(2)).incrTokenSequenceNo(); } + + @Test(timeout = 30000) + public void testTokenThreadTimeout() throws Exception { + Configuration yarnConf = new YarnConfiguration(); + yarnConf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, + true); + yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + yarnConf.setClass(YarnConfiguration.RM_STORE, MemoryRMStateStore.class, + RMStateStore.class); + yarnConf.setInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT, 10); + yarnConf.setInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL, + 10); + yarnConf.setInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS, + 3); + UserGroupInformation.setConfiguration(yarnConf); + + Text userText1 = new Text("user1"); + DelegationTokenIdentifier dtId1 = new DelegationTokenIdentifier(userText1, + new Text("renewer1"), userText1); + final Token originalToken = + new Token<>(dtId1.getBytes(), "password1".getBytes(), dtId1.getKind(), + new Text("service1")); + + Credentials credentials = new Credentials(); + credentials.addToken(userText1, originalToken); + + AtomicBoolean renewDelay = new AtomicBoolean(false); + + // -1 is because of thread allocated to pool tracker runnable tasks + AtomicInteger counter = new AtomicInteger(-1); + renewDelay.set(true); + DelegationTokenRenewer renewer = createNewDelegationTokenRenewerForTimeout( + yarnConf, counter, renewDelay); + + MockRM rm1 = new TestSecurityMockRM(yarnConf) { + @Override + protected DelegationTokenRenewer createDelegationTokenRenewer() { + return renewer; + } + }; + + rm1.start(); + rm1.submitApp(200, "name", "user", + new HashMap(), false, "default", 1, + credentials); + + int attempts = yarnConf.getInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS); + + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + return (counter.get() >= attempts); + } + }, 2000, 30000); + + // Ensure no. of threads has been used in renewer service thread pool is + // higher than the configured max retry attempts + assertTrue(counter.get() >= attempts); + rm1.close(); + } + + @Test(timeout = 30000) + public void testTokenThreadTimeoutWithoutDelay() throws Exception { + Configuration yarnConf = new YarnConfiguration(); + yarnConf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, + true); + yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + yarnConf.set(YarnConfiguration.RM_STORE, + MemoryRMStateStore.class.getName()); + yarnConf.setInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT, 60); + yarnConf.setInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL, + 60); + yarnConf.setInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS, + 3); + UserGroupInformation.setConfiguration(yarnConf); + + Text userText1 = new Text("user1"); + DelegationTokenIdentifier dtId1 = new DelegationTokenIdentifier(userText1, + new Text("renewer1"), userText1); + final Token originalToken = + new Token<>(dtId1.getBytes(), "password1".getBytes(), dtId1.getKind(), + new Text("service1")); + + Credentials credentials = new Credentials(); + credentials.addToken(userText1, originalToken); + + AtomicBoolean renewDelay = new AtomicBoolean(false); + + // -1 is because of thread allocated to pool tracker runnable tasks + AtomicInteger counter = new AtomicInteger(-1); + DelegationTokenRenewer renwer = createNewDelegationTokenRenewerForTimeout( + yarnConf, counter, renewDelay); + + MockRM rm1 = new TestSecurityMockRM(yarnConf) { + @Override + protected DelegationTokenRenewer createDelegationTokenRenewer() { + return renwer; + } + }; + + rm1.start(); + rm1.submitApp(200, "name", "user", + new HashMap(), false, "default", 1, + credentials); + + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + return (counter.get() == 1); + } + }, 2000, 40000); + + // Ensure only one thread has been used in renewer service thread pool. + assertTrue(counter.get() == 1); + rm1.close(); + } + + private DelegationTokenRenewer createNewDelegationTokenRenewerForTimeout( + Configuration conf, final AtomicInteger counter, + final AtomicBoolean renewDelay) { + DelegationTokenRenewer renew = new DelegationTokenRenewer() { + @Override + protected ThreadPoolExecutor createNewThreadPoolService( + Configuration conf) { + ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 5, 3L, + TimeUnit.SECONDS, new LinkedBlockingQueue()) { + @Override + public Future submit(Runnable r) { + counter.incrementAndGet(); + return super.submit(r); + } + }; + return pool; + } + + @Override + protected void renewToken(final DelegationTokenToRenew dttr) + throws IOException { + try { + if (renewDelay.get()) { + // Delay for 4 times than the configured timeout + Thread.sleep(conf.getTimeDuration( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT, + TimeUnit.MILLISECONDS) * 4); + } + super.renewToken(dttr); + } catch (InterruptedException e) { + LOG.info("Sleep Interrupted", e); + } + } + }; + renew.setDelegationTokenRenewerPoolTracker(true); + return renew; + } }