diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index d3ed5032363..2ed51229c16 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -36,10 +36,13 @@ import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -122,6 +125,17 @@ public static final long DEFAULT_RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING = 10800000; // 3h + public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT = + YarnConfiguration.RM_PREFIX + "delegation-token-renewer.thread-timeout"; + public static final long DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT = + 60; // 60 Seconds + + public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL = + YarnConfiguration.RM_PREFIX + + "delegation-token-renewer.thread-retry-interval"; + public static final long DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL = + 60 * 1000; // 60 Seconds + public DelegationTokenRenewer() { super(DelegationTokenRenewer.class.getName()); } @@ -193,12 +207,30 @@ protected void serviceStart() throws Exception { private void processDelegationTokenRenewerEvent( DelegationTokenRenewerEvent evt) { serviceStateLock.readLock().lock(); + Future future = + null; try { if (isServiceStarted) { - renewerService.execute(new DelegationTokenRenewerRunnable(evt)); + future = + renewerService.submit(new DelegationTokenRenewerRunnable(evt)); + future.get(DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_TIMEOUT, + TimeUnit.SECONDS); } else { pendingEventQueue.add(evt); } + } catch (TimeoutException e) { + // Cancel thread and retry the same event in case of timeout + if (future != null && !future.isDone() && !future.isCancelled()) { + future.cancel(true); + renewalTimer.schedule(new TimerTask() { + @Override + public void run() { + pendingEventQueue.add(evt); + } + }, DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_RETRY_INTERVAL); + } + } catch (InterruptedException | ExecutionException e) { + LOG.info("Problem in submitting renew tasks in token renewer thread.", e); } finally { serviceStateLock.readLock().unlock(); }