Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java (revision 1466270) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java (working copy) @@ -20,21 +20,15 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashSet; -import java.util.Iterator; import java.util.List; -import java.util.Map.Entry; import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.LinkedBlockingQueue; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -43,7 +37,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.service.AbstractService; @@ -60,27 +54,17 @@ public static final String SCHEME = "hdfs"; + // track apps and their tokens + final ConcurrentMap + trackedApps = new ConcurrentHashMap(); + final ConcurrentMap, ManagedToken> + trackedTokens = new ConcurrentHashMap, ManagedToken>(); + // global single timer (daemon) private Timer renewalTimer; - // delegation token canceler thread - private DelegationTokenCancelThread dtCancelThread = - new DelegationTokenCancelThread(); - - // managing the list of tokens using Map - // appId=>List - private Set delegationTokens = - Collections.synchronizedSet(new HashSet()); + private long tokenRemovalDelayMs = 0; - private final ConcurrentMap delayedRemovalMap = - new ConcurrentHashMap(); - - private long tokenRemovalDelayMs; - - private Thread delayedRemovalThread; - - private boolean tokenKeepAliveEnabled; - public DelegationTokenRenewer() { super(DelegationTokenRenewer.class.getName()); } @@ -88,26 +72,20 @@ @Override public synchronized void init(Configuration conf) { super.init(conf); - this.tokenKeepAliveEnabled = + boolean tokenKeepAliveEnabled = conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); - this.tokenRemovalDelayMs = + if (tokenKeepAliveEnabled) { + tokenRemovalDelayMs = conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, - YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); + YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); + } } @Override public synchronized void start() { super.start(); - - dtCancelThread.start(); renewalTimer = new Timer(true); - if (tokenKeepAliveEnabled) { - delayedRemovalThread = - new Thread(new DelayedTokenRemovalRunnable(getConfig()), - "DelayedTokenCanceller"); - delayedRemovalThread.start(); - } } @Override @@ -115,144 +93,11 @@ if (renewalTimer != null) { renewalTimer.cancel(); } - delegationTokens.clear(); - - dtCancelThread.interrupt(); - try { - dtCancelThread.join(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - if (tokenKeepAliveEnabled && delayedRemovalThread != null) { - delayedRemovalThread.interrupt(); - try { - delayedRemovalThread.join(1000); - } catch (InterruptedException e) { - LOG.info("Interrupted while joining on delayed removal thread.", e); - } - } - + trackedTokens.clear(); + trackedApps.clear(); super.stop(); } - - /** - * class that is used for keeping tracks of DT to renew - * - */ - private static class DelegationTokenToRenew { - public final Token token; - public final ApplicationId applicationId; - public final Configuration conf; - public long expirationDate; - public TimerTask timerTask; - public final boolean shouldCancelAtEnd; - - public DelegationTokenToRenew( - ApplicationId jId, Token token, - Configuration conf, long expirationDate, boolean shouldCancelAtEnd) { - this.token = token; - this.applicationId = jId; - this.conf = conf; - this.expirationDate = expirationDate; - this.timerTask = null; - this.shouldCancelAtEnd = shouldCancelAtEnd; - if (this.token==null || this.applicationId==null || this.conf==null) { - throw new IllegalArgumentException("Invalid params to renew token" + - ";token=" + this.token + - ";appId=" + this.applicationId + - ";conf=" + this.conf); - } - } - - public void setTimerTask(TimerTask tTask) { - timerTask = tTask; - } - - @Override - public String toString() { - return token + ";exp=" + expirationDate; - } - - @Override - public boolean equals(Object obj) { - return obj instanceof DelegationTokenToRenew && - token.equals(((DelegationTokenToRenew)obj).token); - } - - @Override - public int hashCode() { - return token.hashCode(); - } - } - - private static class DelegationTokenCancelThread extends Thread { - private static class TokenWithConf { - Token token; - Configuration conf; - TokenWithConf(Token token, Configuration conf) { - this.token = token; - this.conf = conf; - } - } - private LinkedBlockingQueue queue = - new LinkedBlockingQueue(); - - public DelegationTokenCancelThread() { - super("Delegation Token Canceler"); - setDaemon(true); - } - public void cancelToken(Token token, - Configuration conf) { - TokenWithConf tokenWithConf = new TokenWithConf(token, conf); - while (!queue.offer(tokenWithConf)) { - LOG.warn("Unable to add token " + token + " for cancellation. " + - "Will retry.."); - try { - Thread.sleep(100); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - } - - public void run() { - TokenWithConf tokenWithConf = null; - while (true) { - try { - tokenWithConf = queue.take(); - final TokenWithConf current = tokenWithConf; - if (LOG.isDebugEnabled()) { - LOG.debug("Canceling token " + tokenWithConf.token.getService()); - } - // need to use doAs so that http can find the kerberos tgt - UserGroupInformation.getLoginUser() - .doAs(new PrivilegedExceptionAction(){ - - @Override - public Void run() throws Exception { - current.token.cancel(current.conf); - return null; - } - }); - } catch (IOException e) { - LOG.warn("Failed to cancel token " + tokenWithConf.token + " " + - StringUtils.stringifyException(e)); - } catch (InterruptedException ie) { - return; - } catch (Throwable t) { - LOG.warn("Got exception " + StringUtils.stringifyException(t) + - ". Exiting.."); - System.exit(-1); - } - } - } - } - //adding token - private void addTokenToList(DelegationTokenToRenew t) { - delegationTokens.add(t); - } - /** * Add application tokens for renewal. * @param applicationId added application @@ -269,146 +114,39 @@ } if (LOG.isDebugEnabled()) { - LOG.debug("Registering tokens for renewal for:" + - " appId = " + applicationId); + LOG.debug("Registering tokens for renewal: appId=" + applicationId); } - - Collection > tokens = ts.getAllTokens(); - long now = System.currentTimeMillis(); - - // find tokens for renewal, but don't add timers until we know - // all renewable tokens are valid - Set dtrs = new HashSet(); - for(Token token : tokens) { - // first renew happens immediately - if (token.isManaged()) { - DelegationTokenToRenew dtr = - new DelegationTokenToRenew(applicationId, token, getConfig(), now, - shouldCancelAtEnd); - renewToken(dtr); - dtrs.add(dtr); - } - } - for (DelegationTokenToRenew dtr : dtrs) { - addTokenToList(dtr); - setTimerForTokenRenewal(dtr); - if (LOG.isDebugEnabled()) { - LOG.debug("Registering token for renewal for:" + - " service = " + dtr.token.getService() + - " for appId = " + applicationId); - } - } - } - - /** - * Task - to renew a token - * - */ - private class RenewalTimerTask extends TimerTask { - private DelegationTokenToRenew dttr; - private boolean cancelled = false; - - RenewalTimerTask(DelegationTokenToRenew t) { - dttr = t; - } - - @Override - public synchronized void run() { - if (cancelled) { - return; - } - Token token = dttr.token; - try { - renewToken(dttr); - if (LOG.isDebugEnabled()) { - LOG.debug("Renewing delegation-token for:" + token.getService() + - "; new expiration;" + dttr.expirationDate); + ManagedApp app = newInstance(applicationId); + try { + for (Token token : ts.getAllTokens()) { + if (token.isManaged()) { + ManagedToken managedToken = getInstance(token, shouldCancelAtEnd); + app.add(managedToken); + if (LOG.isDebugEnabled()) { + LOG.debug("Registering token " + managedToken + + " for appId " + applicationId); + } } - - setTimerForTokenRenewal(dttr);// set the next one - } catch (Exception e) { - LOG.error("Exception renewing token" + token + ". Not rescheduled", e); - removeFailedDelegationToken(dttr); } + } catch (InterruptedException ie) { + app.expunge(); // removed failed app + throw new IOException(ie); + } catch (IOException ioe) { + app.expunge(); // removed failed app + throw ioe; } - - @Override - public synchronized boolean cancel() { - cancelled = true; - return super.cancel(); - } } - - /** - * set task to renew the token - */ - private void setTimerForTokenRenewal(DelegationTokenToRenew token) - throws IOException { - - // calculate timer time - long expiresIn = token.expirationDate - System.currentTimeMillis(); - long renewIn = token.expirationDate - expiresIn/10; // little bit before the expiration - - // need to create new task every time - TimerTask tTask = new RenewalTimerTask(token); - token.setTimerTask(tTask); // keep reference to the timer - renewalTimer.schedule(token.timerTask, new Date(renewIn)); - } - - // renew a token - private void renewToken(final DelegationTokenToRenew dttr) - throws IOException { - // need to use doAs so that http can find the kerberos tgt - // NOTE: token renewers should be responsible for the correct UGI! - try { - dttr.expirationDate = UserGroupInformation.getLoginUser().doAs( - new PrivilegedExceptionAction(){ - @Override - public Long run() throws Exception { - return dttr.token.renew(dttr.conf); - } - }); - } catch (InterruptedException e) { - throw new IOException(e); - } - } - - // cancel a token - private void cancelToken(DelegationTokenToRenew t) { - if(t.shouldCancelAtEnd) { - dtCancelThread.cancelToken(t.token, t.conf); - } else { - LOG.info("Did not cancel "+t); - } - } - /** - * removing failed DT - * @param applicationId - */ - private void removeFailedDelegationToken(DelegationTokenToRenew t) { - ApplicationId applicationId = t.applicationId; - if (LOG.isDebugEnabled()) - LOG.debug("removing failed delegation token for appid=" + applicationId + - ";t=" + t.token.getService()); - delegationTokens.remove(t); - // cancel the timer - if(t.timerTask!=null) - t.timerTask.cancel(); - } - - /** * Removing delegation token for completed applications. * @param applicationId completed application */ public void applicationFinished(ApplicationId applicationId) { - if (!tokenKeepAliveEnabled) { - removeApplicationFromRenewal(applicationId); - } else { - delayedRemovalMap.put(applicationId, System.currentTimeMillis() - + tokenRemovalDelayMs); + // notify tracked app if it still exists + ManagedApp app = trackedApps.get(applicationId); + if (app != null) { + app.hasFinished(); } } @@ -421,81 +159,283 @@ * */ public void updateKeepAliveApplications(List appIds) { - if (tokenKeepAliveEnabled && appIds != null && appIds.size() > 0) { + if (appIds != null) { for (ApplicationId appId : appIds) { - delayedRemovalMap.put(appId, System.currentTimeMillis() - + tokenRemovalDelayMs); + // update tracked app if it still exists + ManagedApp app = trackedApps.get(appId); + if (app != null) { + app.updateKeepAlive(); + } } } } - - private void removeApplicationFromRenewal(ApplicationId applicationId) { - synchronized (delegationTokens) { - Iterator it = delegationTokens.iterator(); - while(it.hasNext()) { - DelegationTokenToRenew dttr = it.next(); - if (dttr.applicationId.equals(applicationId)) { + + // create a unique app instance and add to concurrent map + ManagedApp newInstance(ApplicationId appId) throws IOException { + ManagedApp instance = new ManagedApp(appId); + if (trackedApps.putIfAbsent(appId, instance) != null) { + throw new IOException( + "Application " + appId + " has already been submitted"); + } + return instance; + } + + // create or obtain token instance from concurrent map + ManagedToken getInstance(Token token, boolean cancelAtEnd) + throws IOException, InterruptedException { + ManagedToken instance = trackedTokens.get(token); + if (instance == null) { + instance = new ManagedToken(token); + instance.scheduleRenewTask(); // validates token + // handle unlikely race that another app submitted with same token + ManagedToken priorInstance = trackedTokens.putIfAbsent(token, instance); + if (priorInstance != null) { + instance.abortScheduledTask(); // stop renewing the dup + instance = priorInstance; + } + } + // honor any app not wanting the token cancelled + instance.cancelAtEnd &= cancelAtEnd; + return instance; + } + + /** + * Class to manage applications with tokens + */ + private class ManagedApp { + private final ApplicationId appId; + private final Set appTokens = + Collections.synchronizedSet(new HashSet()); + // volatile due to use by both the thread updating the keepalive + // and the timer thread that will remove the app + private volatile long keepAliveUntil = 0; + + ManagedApp(ApplicationId appId) { + this.appId = appId; + } + + // associate token with this app + void add(ManagedToken managedToken) { + appTokens.add(managedToken); + managedToken.add(appId); + } + + // disassociate all tokens from this app + void expunge() { + if (trackedApps.remove(appId) != null) { + synchronized (appTokens) { if (LOG.isDebugEnabled()) { - LOG.debug("Removing delegation token for appId=" + applicationId + - "; token=" + dttr.token.getService()); + LOG.debug(appId + ": purging references to tokens " + appTokens); } - - // cancel the timer - if(dttr.timerTask!=null) - dttr.timerTask.cancel(); - - // cancel the token - cancelToken(dttr); - - it.remove(); + for (ManagedToken managedToken : appTokens) { + // may trigger timers for token cancellation + managedToken.remove(appId); + } + appTokens.clear(); } } } + + // start a timer task to eventually expunge this app + void hasFinished() { + updateKeepAlive(); + scheduleRemoval(); + } + + void updateKeepAlive() { + if (LOG.isDebugEnabled()) { + LOG.debug(appId + ": updating keepalive"); + } + keepAliveUntil = Time.now() + tokenRemovalDelayMs; + } + + void scheduleRemoval() { + Date removeDate = new Date(keepAliveUntil); + if (LOG.isDebugEnabled()) { + LOG.debug(appId + ": scheduling for removal at " + removeDate); + } + renewalTimer.schedule(new AppRemovalTask(), removeDate); + } + + private class AppRemovalTask extends RenewerTask { + @Override + public void runAction() { + if (keepAliveUntil > Time.now()) { + scheduleRemoval(); // keepalive was updated, reschedule + } else { + expunge(); // remove token refs, may trigger token cancellation + } + } + } + + @Override + public int hashCode() { + return appId.hashCode(); + } + + @Override + public boolean equals(Object obj) { + return (this == obj) || + (obj instanceof ManagedApp && + appId.equals(((ManagedApp)obj).appId)); + } + + @Override + public String toString() { + return appId.toString(); + } } - + /** - * Takes care of cancelling app delegation tokens after the configured - * cancellation delay, taking into consideration keep-alive requests. - * + * class that is used for keeping tracks of DT to renew or cancel */ - private class DelayedTokenRemovalRunnable implements Runnable { - - private long waitTimeMs; - - DelayedTokenRemovalRunnable(Configuration conf) { - waitTimeMs = - conf.getLong( - YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS, - YarnConfiguration.DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS); + private class ManagedToken { + private final Token token; + private Set tokenApps = + Collections.synchronizedSet(new HashSet()); + private TimerTask timerTask; + private boolean cancelAtEnd = true; + + ManagedToken(Token token) { + this.token = token; } - - @Override - public void run() { - List toCancel = new ArrayList(); - while (!Thread.currentThread().isInterrupted()) { - Iterator> it = - delayedRemovalMap.entrySet().iterator(); - toCancel.clear(); - while (it.hasNext()) { - Entry e = it.next(); - if (e.getValue() < System.currentTimeMillis()) { - toCancel.add(e.getKey()); - } + + // associate app with this token + void add(ApplicationId appId) { + tokenApps.add(appId); + } + + // disassociate app from this token & cancel if not in use by other apps + void remove(ApplicationId appId) { + if (tokenApps.remove(appId) && tokenApps.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Last appId " + appId + " finished for token " + this); } - for (ApplicationId appId : toCancel) { - removeApplicationFromRenewal(appId); - delayedRemovalMap.remove(appId); + expunge(); // remove references to token + scheduleCancelTask(); + } + } + + void expunge() { + trackedTokens.remove(token); + tokenApps.clear(); + } + + void scheduleRenewTask() throws IOException, InterruptedException { + // schedule next renewal a little bit before the expiration + long expiresIn = UserGroupInformation.getLoginUser().doAs( + new PrivilegedExceptionAction() { + @Override + public Long run() throws IOException, InterruptedException { + return token.renew(getConfig()) - Time.now(); + } + }); + long renewIn = expiresIn - expiresIn/10; + scheduleTask(new TokenRenewTask(), renewIn); + } + + private void scheduleCancelTask() { + // stop any renewals that might be scheduled + abortScheduledTask(); + if (cancelAtEnd) { + if (LOG.isDebugEnabled()) { + LOG.debug("Schedule cancel of token " + this); } - synchronized (this) { - try { - wait(waitTimeMs); - } catch (InterruptedException e) { - LOG.info("Delayed Deletion Thread Interrupted. Shutting it down"); - return; + scheduleTask(new TokenCancelTask(), 0); + } + } + + // sync'ed to avoid scheduling races + private synchronized void scheduleTask(RenewerTask nextTask, long delay) { + abortScheduledTask(); + renewalTimer.schedule(nextTask, delay); + timerTask = nextTask; + } + + private synchronized void abortScheduledTask() { + if (timerTask != null) { + timerTask.cancel(); + timerTask = null; + } + } + + /** + * Task - to renew a token + */ + private class TokenRenewTask extends RenewerTask { + @Override + void runAction() throws InterruptedException { + try { + // renew and schedule the next renewal + scheduleRenewTask(); + if (LOG.isDebugEnabled()) { + LOG.debug("Renewed token " + ManagedToken.this + + " for apps " + tokenApps); } + } catch (IOException e) { + LOG.error("Failed to renew token " + ManagedToken.this + + " for apps " + tokenApps +" . Not rescheduled", e); } } } + + /** + * Task - to cancel a token + */ + private class TokenCancelTask extends RenewerTask { + @Override + void runAction() throws InterruptedException { + try { + UserGroupInformation.getLoginUser().doAs( + new PrivilegedExceptionAction() { + @Override + public Void run() throws IOException, InterruptedException { + token.cancel(getConfig()); + return null; + } + }); + if (LOG.isDebugEnabled()) { + LOG.debug("Cancelled token " + ManagedToken.this); + } + } catch (IOException e) { + LOG.warn("Failed to cancel token " + ManagedToken.this, e); + } + } + } + + @Override + public int hashCode() { + return token.hashCode(); + } + + @Override + public boolean equals(Object obj) { + return (this == obj) || + (obj instanceof ManagedToken && + token.equals(((ManagedToken)obj).token)); + } + + @Override + public String toString() { + return "(kind=" + token.getKind() + + "; service=" + token.getService() + ")"; + } } -} + private static abstract class RenewerTask extends TimerTask { + @Override + public final void run() { + try { + runAction(); + } catch (InterruptedException e) { + return; + } catch (Exception e) { + LOG.warn("Ignoring unhandled exception", e); + } catch (Throwable e) { + LOG.fatal("Unhandled error. Exiting", e); + System.exit(-1); + } + } + abstract void runAction() throws InterruptedException; + } +} \ No newline at end of file Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java (revision 1466270) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java (working copy) @@ -22,9 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.*; import java.io.IOException; import java.net.URI; @@ -52,6 +50,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -124,6 +124,7 @@ @BeforeClass public static void setUpClass() throws Exception { + Logger.getRootLogger().setLevel(Level.DEBUG); conf = new Configuration(); // create a fake FileSystem (MyFS) and assosiate it @@ -599,4 +600,201 @@ endBarrier.await(); submitThread.join(); } + + @Test(timeout=16000) + public void testSharedTokensStayAlive() throws Exception { + Configuration conf = new Configuration(); + + ApplicationId app1 = createMockAppId(1); + ApplicationId app2 = createMockAppId(2); + ApplicationId app3 = createMockAppId(3); + + Token app1Token = createMockToken("app1-only"); + Token app2Token = createMockToken("app2-only"); + Token app3Token = createMockToken("app3-only"); + Token app13Token = createMockToken("app1+app3"); + Token app123Token = createMockToken("shared"); + + Credentials creds1 = createCredentials(app123Token, app1Token, app13Token); + Credentials creds2 = createCredentials(app123Token, app2Token); + Credentials creds3 = createCredentials(app123Token, app3Token, app13Token); + + // fire up the renewer + final DelegationTokenRenewer dtr = new DelegationTokenRenewer(); + dtr.init(conf); + dtr.start(); + + assertEquals(0, dtr.trackedApps.size()); + assertEquals(0, dtr.trackedTokens.size()); + + // add app1 + // running = app1 + dtr.addApplication(app1, creds1, true); + verify(app123Token, times(1)).renew(conf); // inc + verify(app1Token, times(1)).renew(conf); // inc + verify(app2Token, times(0)).renew(conf); + verify(app3Token, times(0)).renew(conf); + verify(app13Token, times(1)).renew(conf); // inc + + verify(app123Token, times(0)).cancel(conf); + verify(app1Token, times(0)).cancel(conf); + verify(app2Token, times(0)).cancel(conf); + verify(app3Token, times(0)).cancel(conf); + verify(app13Token, times(0)).cancel(conf); + + assertEquals(1, dtr.trackedApps.size()); + assertEquals(3, dtr.trackedTokens.size()); + + // add app2 + // running = app1 + app2 + dtr.addApplication(app2, creds2, true); + verify(app123Token, times(1)).renew(conf); // no change + verify(app1Token, times(1)).renew(conf); + verify(app2Token, times(1)).renew(conf); // inc + verify(app3Token, times(0)).renew(conf); + verify(app13Token, times(1)).renew(conf); + + verify(app123Token, times(0)).cancel(conf); + verify(app1Token, times(0)).cancel(conf); + verify(app2Token, times(0)).cancel(conf); + verify(app3Token, times(0)).cancel(conf); + verify(app13Token, times(0)).cancel(conf); + + assertEquals(2, dtr.trackedApps.size()); + assertEquals(4, dtr.trackedTokens.size()); + + // finish app2 + // running = app1 + dtr.applicationFinished(app2); + Thread.sleep(1000); // let cancel timer fire + verify(app123Token, times(1)).renew(conf); + verify(app1Token, times(1)).renew(conf); + verify(app2Token, times(1)).renew(conf); + verify(app3Token, times(0)).renew(conf); + verify(app13Token, times(1)).renew(conf); + + verify(app123Token, times(0)).cancel(conf); // no change (1 running) + verify(app1Token, times(0)).cancel(conf); + verify(app2Token, times(1)).cancel(conf); // inc + verify(app3Token, times(0)).cancel(conf); + verify(app13Token, times(0)).cancel(conf); + + assertEquals(1, dtr.trackedApps.size()); + assertEquals(3, dtr.trackedTokens.size()); + + // add app3 + // running = app1 + app3 + dtr.addApplication(app3, creds3, true); + verify(app123Token, times(1)).renew(conf); // no change (known) + verify(app1Token, times(1)).renew(conf); + verify(app2Token, times(1)).renew(conf); + verify(app3Token, times(1)).renew(conf); // inc + verify(app13Token, times(1)).renew(conf); // no change (known) + + verify(app123Token, times(0)).cancel(conf); + verify(app1Token, times(0)).cancel(conf); + verify(app2Token, times(1)).cancel(conf); + verify(app3Token, times(0)).cancel(conf); + verify(app13Token, times(0)).cancel(conf); + + assertEquals(2, dtr.trackedApps.size()); + assertEquals(4, dtr.trackedTokens.size()); + + // finish app1 + // running = app3 + dtr.applicationFinished(app1); + Thread.sleep(1000); // let cancel timer fire + verify(app123Token, times(1)).renew(conf); + verify(app1Token, times(1)).renew(conf); + verify(app2Token, times(1)).renew(conf); + verify(app3Token, times(1)).renew(conf); + verify(app13Token, times(1)).renew(conf); + + verify(app123Token, times(0)).cancel(conf); // no change (3 running) + verify(app1Token, times(1)).cancel(conf); // inc + verify(app2Token, times(1)).cancel(conf); + verify(app3Token, times(0)).cancel(conf); + verify(app13Token, times(0)).cancel(conf); // no change (3 running) + + assertEquals(1, dtr.trackedApps.size()); + assertEquals(3, dtr.trackedTokens.size()); + + // add app2, but don't cancel when done + // running = app3 + app2 + dtr.addApplication(app2, creds2, false); + verify(app123Token, times(1)).renew(conf); // no change (2 running) + verify(app1Token, times(1)).renew(conf); + verify(app2Token, times(2)).renew(conf); // inc + verify(app3Token, times(1)).renew(conf); + verify(app13Token, times(1)).renew(conf); + + verify(app123Token, times(0)).cancel(conf); + verify(app1Token, times(1)).cancel(conf); + verify(app2Token, times(1)).cancel(conf); + verify(app3Token, times(0)).cancel(conf); + verify(app13Token, times(0)).cancel(conf); + + assertEquals(2, dtr.trackedApps.size()); + assertEquals(4, dtr.trackedTokens.size()); + + // finish app2 + // running = app3 + dtr.applicationFinished(app2); + Thread.sleep(1000); // let cancel timer fire + verify(app123Token, times(1)).renew(conf); + verify(app1Token, times(1)).renew(conf); + verify(app2Token, times(2)).renew(conf); + verify(app3Token, times(1)).renew(conf); + verify(app13Token, times(1)).renew(conf); + + verify(app123Token, times(0)).cancel(conf); // no change (!cancel) + verify(app1Token, times(1)).cancel(conf); + verify(app2Token, times(1)).cancel(conf); // no change (!cancel) + verify(app3Token, times(0)).cancel(conf); + verify(app13Token, times(0)).cancel(conf); + + assertEquals(1, dtr.trackedApps.size()); + assertEquals(3, dtr.trackedTokens.size()); + + // finish app3 + dtr.applicationFinished(app3); + Thread.sleep(1000); // let cancel timer fire + verify(app123Token, times(1)).renew(conf); + verify(app1Token, times(1)).renew(conf); + verify(app2Token, times(2)).renew(conf); + verify(app3Token, times(1)).renew(conf); + verify(app13Token, times(1)).renew(conf); + + verify(app123Token, times(0)).cancel(conf); // no change (!cancel) + verify(app1Token, times(1)).cancel(conf); + verify(app2Token, times(1)).cancel(conf); + verify(app3Token, times(1)).cancel(conf); // inc + verify(app13Token, times(1)).cancel(conf); // inc + + assertEquals(0, dtr.trackedApps.size()); + assertEquals(0, dtr.trackedTokens.size()); + } + + private Token createMockToken(String name) throws Exception { + final Token token = mock(Token.class); + doReturn(true).when(token).isManaged(); + doReturn(Long.MAX_VALUE).when(token).renew(any(Configuration.class)); + doReturn(new Text(name)).when(token).getKind(); + doReturn(new Text(name)).when(token).getService(); + return token; + } + + private ApplicationId createMockAppId(int n) throws Exception { + final ApplicationId appId = mock(ApplicationId.class); + doReturn("App#"+n).when(appId).toString(); + return appId; + } + + private Credentials createCredentials(Token... tokens) { + Credentials creds = new Credentials(); + for (Token token : tokens) { + creds.addToken(token.getService(), token); + } + return creds; + } } Index: hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml =================================================================== --- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml (revision 1466283) +++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml (working copy) @@ -189,7 +189,7 @@ - +