diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 49d2ff05801..f45d6a5aef0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -729,6 +730,19 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_RM_DELEGATION_TOKEN_MAX_CONF_SIZE_BYTES = 12800; + public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT = + RM_PREFIX + "delegation-token-renewer.thread-timeout"; + public static final long DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT = + TimeUnit.SECONDS.toMillis(60); // 60 Seconds + public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL = + RM_PREFIX + "delegation-token-renewer.thread-retry-interval"; + public static final long DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL = + TimeUnit.SECONDS.toMillis(60); // 60 Seconds + public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS = + RM_PREFIX + "delegation-token-renewer.thread-retry-max-attempts"; + public static final int DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS = + +10; + public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled"; public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index d3ed5032363..c29d2df534c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -36,16 +36,16 @@ import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; @@ -70,6 +70,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -115,6 +117,12 @@ private boolean tokenKeepAliveEnabled; private boolean hasProxyUserPrivileges; private long credentialsValidTimeRemaining; + private long tokenRenewerThreadTimeout; + private long tokenRenewerThreadRetryInterval; + private int tokenRenewerThreadRetryMaxAttempts; + private final ConcurrentHashMap> futures = + new ConcurrentHashMap>(); + private boolean delegationTokenRenewerPoolTrackerFlag = true; // this config is supposedly not used by end-users. public static final String RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING = @@ -140,6 +148,17 @@ protected void serviceInit(Configuration conf) throws Exception { this.credentialsValidTimeRemaining = conf.getLong(RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING, DEFAULT_RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING); + tokenRenewerThreadTimeout = conf.getTimeDuration( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT, + TimeUnit.MILLISECONDS); + tokenRenewerThreadRetryInterval = conf.getTimeDuration( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL, + TimeUnit.MILLISECONDS); + tokenRenewerThreadRetryMaxAttempts = conf.getInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS); setLocalSecretManagerAndServiceAddr(); renewerService = createNewThreadPoolService(conf); pendingEventQueue = new LinkedBlockingQueue(); @@ -184,6 +203,11 @@ protected void serviceStart() throws Exception { serviceStateLock.writeLock().lock(); isServiceStarted = true; serviceStateLock.writeLock().unlock(); + + if (delegationTokenRenewerPoolTrackerFlag) { + renewerService.submit(new DelegationTokenRenewerPoolTracker()); + } + while(!pendingEventQueue.isEmpty()) { processDelegationTokenRenewerEvent(pendingEventQueue.take()); } @@ -195,7 +219,9 @@ private void processDelegationTokenRenewerEvent( serviceStateLock.readLock().lock(); try { if (isServiceStarted) { - renewerService.execute(new DelegationTokenRenewerRunnable(evt)); + Future future = + renewerService.submit(new DelegationTokenRenewerRunnable(evt)); + futures.put(evt, future); } else { pendingEventQueue.add(evt); } @@ -476,7 +502,8 @@ private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt) for (Iterator> itor = tokenConf.iterator(); itor.hasNext(); ) { Map.Entry entry = itor.next(); - LOG.info(entry.getKey() + " ===> " + entry.getValue()); + LOG.debug("Token conf key is {} and value is {}", + entry.getKey(), entry.getValue()); } } } else { @@ -895,21 +922,101 @@ public void setRMContext(RMContext rmContext) { this.rmContext = rmContext; } + @VisibleForTesting + public void setDelegationTokenRenewerPoolTracker(boolean flag) { + delegationTokenRenewerPoolTrackerFlag = flag; + } + + private TimerTask getTimerTask(DelegationTokenRenewerEvent evt) { + return new TimerTask() { + @Override + public void run() { + LOG.info("Retrying token renewer thread for appid = {} and " + + "attempt is {}", evt.getApplicationId(), + evt.getAttempt()); + evt.incrAttempt(); + + AbstractDelegationTokenRenewerAppEvent recoverEvt = + (AbstractDelegationTokenRenewerAppEvent) evt; + Collection> tokens = + recoverEvt.getCredentials().getAllTokens(); + for (Token token : tokens) { + DelegationTokenToRenew dttr = allTokens.get(token); + if (dttr != null) { + removeFailedDelegationToken(dttr); + } + } + + DelegationTokenRenewerAppRecoverEvent event = + new DelegationTokenRenewerAppRecoverEvent( + recoverEvt.getApplicationId(), + recoverEvt.getCredentials(), + recoverEvt.shouldCancelAtEnd(), + recoverEvt.getUser(), recoverEvt.getTokenConf()); + event.setAttempt(evt.getAttempt()); + processDelegationTokenRenewerEvent(event); + } + }; + } + + private final class DelegationTokenRenewerPoolTracker + implements Runnable { + + public DelegationTokenRenewerPoolTracker() { + } + + @Override + public void run() { + + while (true) { + + for (Map.Entry> entry : futures + .entrySet()) { + + DelegationTokenRenewerEvent evt = entry.getKey(); + Future future = entry.getValue(); + + try { + future.get(tokenRenewerThreadTimeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + + // Cancel thread and retry the same event in case of timeout + if (future != null && !future.isDone() && !future.isCancelled()) { + future.cancel(true); + futures.remove(evt); + if (evt.getAttempt() < tokenRenewerThreadRetryMaxAttempts) { + renewalTimer.schedule(getTimerTask(evt), + tokenRenewerThreadRetryInterval); + } else { + LOG.info( + "Exhausted max retry attempts {} in token renewer " + + "thread for {}", + tokenRenewerThreadRetryMaxAttempts, evt.getApplicationId()); + } + } + } catch (Exception e) { + LOG.info("Problem in submitting renew tasks in token renewer " + + "thread.", e); + } + } + } + } + } + /* * This will run as a separate thread and will process individual events. It * is done in this way to make sure that the token renewal as a part of - * application submission and token removal as a part of application finish - * is asynchronous in nature. + * application submission and token removal as a part of application finish is + * asynchronous in nature. */ - private final class DelegationTokenRenewerRunnable - implements Runnable { + private final class DelegationTokenRenewerRunnable implements Runnable { private DelegationTokenRenewerEvent evt; - + public DelegationTokenRenewerRunnable(DelegationTokenRenewerEvent evt) { this.evt = evt; } - + @Override public void run() { if (evt instanceof DelegationTokenRenewerAppSubmitEvent) { @@ -1016,6 +1123,10 @@ public boolean shouldCancelAtEnd() { public String getUser() { return user; } + + private Configuration getTokenConf() { + return tokenConf; + } } enum DelegationTokenRenewerEventType { @@ -1028,6 +1139,7 @@ public String getUser() { AbstractEvent { private ApplicationId appId; + private int attempt = 1; public DelegationTokenRenewerEvent(ApplicationId appId, DelegationTokenRenewerEventType type) { @@ -1038,6 +1150,18 @@ public DelegationTokenRenewerEvent(ApplicationId appId, public ApplicationId getApplicationId() { return appId; } + + public void incrAttempt() { + attempt++; + } + + public int getAttempt() { + return attempt; + } + + public void setAttempt(int attempt) { + this.attempt = attempt; + } } // only for testing diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index ee104b41336..58b64a6c9a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; @@ -42,6 +43,7 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -164,7 +166,7 @@ public long renew(Token t, Configuration conf) throws IOException { throw new InvalidToken("token has been canceled"); } lastRenewed = token; - counter ++; + counter++; LOG.info("Called MYDFS.renewdelegationtoken " + token + ";this dfs=" + this.hashCode() + ";c=" + counter); if(tokenToRenewIn2Sec == token) { @@ -228,6 +230,7 @@ public void setUp() throws Exception { InetSocketAddress sockAddr = InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); + delegationTokenRenewer.setDelegationTokenRenewerPoolTracker(false); delegationTokenRenewer.setRMContext(mockContext); delegationTokenRenewer.init(conf); delegationTokenRenewer.start(); @@ -601,6 +604,7 @@ public void testDTKeepAlive1 () throws Exception { InetSocketAddress sockAddr = InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); + localDtr.setDelegationTokenRenewerPoolTracker(false); localDtr.setRMContext(mockContext); localDtr.init(lconf); localDtr.start(); @@ -620,10 +624,10 @@ public void testDTKeepAlive1 () throws Exception { localDtr.addApplicationAsync(applicationId_0, ts, true, "user", new Configuration()); waitForEventsToGetProcessed(localDtr); - if (!eventQueue.isEmpty()){ + if (!eventQueue.isEmpty()) { Event evt = eventQueue.take(); if (evt instanceof RMAppEvent) { - Assert.assertEquals(((RMAppEvent)evt).getType(), RMAppEventType.START); + Assert.assertEquals(((RMAppEvent) evt).getType(), RMAppEventType.START); } else { fail("RMAppEvent.START was expected!!"); } @@ -681,6 +685,7 @@ public void testDTKeepAlive2() throws Exception { InetSocketAddress sockAddr = InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); + localDtr.setDelegationTokenRenewerPoolTracker(false); localDtr.setRMContext(mockContext); localDtr.init(lconf); localDtr.start(); @@ -1534,4 +1539,167 @@ public void testTokenSequenceNoAfterNewTokenAndRenewal() throws Exception { // Ensure incrTokenSequenceNo has been called for token renewal as well. Mockito.verify(mockContext, Mockito.times(2)).incrTokenSequenceNo(); } + + @Test(timeout = 30000) + public void testTokenThreadTimeout() throws Exception { + Configuration yarnConf = new YarnConfiguration(); + yarnConf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, + true); + yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + yarnConf.set(YarnConfiguration.RM_STORE, + MemoryRMStateStore.class.getName()); + yarnConf.setInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT, 10); + yarnConf.setInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL, + 10); + yarnConf.setInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS, + 3); + UserGroupInformation.setConfiguration(yarnConf); + + Text userText1 = new Text("user1"); + DelegationTokenIdentifier dtId1 = new DelegationTokenIdentifier(userText1, + new Text("renewer1"), userText1); + final Token originalToken = + new Token<>(dtId1.getBytes(), "password1".getBytes(), dtId1.getKind(), + new Text("service1")); + + Credentials credentials = new Credentials(); + credentials.addToken(userText1, originalToken); + + AtomicBoolean renewDelay = new AtomicBoolean(false); + + // -1 is because of thread allocated to pool tracker runnable tasks + AtomicInteger counter = new AtomicInteger(-1); + renewDelay.set(true); + DelegationTokenRenewer renewer = createNewDelegationTokenRenewerForTimeout( + yarnConf, counter, renewDelay); + + MockRM rm1 = new TestSecurityMockRM(yarnConf) { + @Override + protected DelegationTokenRenewer createDelegationTokenRenewer() { + return renewer; + } + }; + + rm1.start(); + rm1.submitApp(200, "name", "user", + new HashMap(), false, "default", 1, + credentials); + + int attempts = yarnConf.getInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS); + + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + return (counter.get() >= attempts); + } + }, 2000, 30000); + + // Ensure no. of threads has been used in renewer service thread pool is + // higher than the configured max retry attempts + assertTrue(counter.get() >= attempts); + rm1.close(); + } + + @Test(timeout = 30000) + public void testTokenThreadTimeoutWithoutDelay() throws Exception { + Configuration yarnConf = new YarnConfiguration(); + yarnConf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, + true); + yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + yarnConf.set(YarnConfiguration.RM_STORE, + MemoryRMStateStore.class.getName()); + yarnConf.setInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT, 60); + yarnConf.setInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL, + 60); + yarnConf.setInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_MAX_ATTEMPTS, + 3); + UserGroupInformation.setConfiguration(yarnConf); + + Text userText1 = new Text("user1"); + DelegationTokenIdentifier dtId1 = new DelegationTokenIdentifier(userText1, + new Text("renewer1"), userText1); + final Token originalToken = + new Token<>(dtId1.getBytes(), "password1".getBytes(), dtId1.getKind(), + new Text("service1")); + + Credentials credentials = new Credentials(); + credentials.addToken(userText1, originalToken); + + AtomicBoolean renewDelay = new AtomicBoolean(false); + + // -1 is because of thread allocated to pool tracker runnable tasks + AtomicInteger counter = new AtomicInteger(-1); + DelegationTokenRenewer renwer = createNewDelegationTokenRenewerForTimeout( + yarnConf, counter, renewDelay); + + MockRM rm1 = new TestSecurityMockRM(yarnConf) { + @Override + protected DelegationTokenRenewer createDelegationTokenRenewer() { + return renwer; + } + }; + + rm1.start(); + rm1.submitApp(200, "name", "user", + new HashMap(), false, "default", 1, + credentials); + + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + return (counter.get() == 1); + } + }, 2000, 40000); + + // Ensure only one thread has been used in renewer service thread pool. + assertTrue(counter.get() == 1); + rm1.close(); + } + + private DelegationTokenRenewer createNewDelegationTokenRenewerForTimeout( + Configuration conf, final AtomicInteger counter, + final AtomicBoolean renewDelay) { + DelegationTokenRenewer renew = new DelegationTokenRenewer() { + @Override + protected ThreadPoolExecutor createNewThreadPoolService( + Configuration conf) { + ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 5, 3L, + TimeUnit.SECONDS, new LinkedBlockingQueue()) { + @Override + public Future submit(Runnable r) { + counter.incrementAndGet(); + return super.submit(r); + } + }; + return pool; + } + + @Override + protected void renewToken(final DelegationTokenToRenew dttr) + throws IOException { + try { + if (renewDelay.get()) { + // Delay for 4 times than the configured timeout + Thread.sleep(conf.getTimeDuration( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT, + TimeUnit.MILLISECONDS) * 4); + } + super.renewToken(dttr); + } catch (InterruptedException e) { + LOG.info("Sleep Interrupted", e); + } + } + }; + renew.setDelegationTokenRenewerPoolTracker(true); + return renew; + } }