diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index e519982151f..f3d45fba8f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -19,14 +19,22 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.application; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.EnumSet; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.Token; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,6 +83,9 @@ */ public class ApplicationImpl implements Application { + public static final Text HDFS_DELEGATION_KIND = + new Text("HDFS_DELEGATION_TOKEN"); + final Dispatcher dispatcher; final String user; // flow context is set only if the timeline service v.2 is enabled @@ -91,6 +102,7 @@ LoggerFactory.getLogger(ApplicationImpl.class); private LogAggregationContext logAggregationContext; + private Token managedToken; Map containers = new ConcurrentHashMap<>(); @@ -341,6 +353,13 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { app.aclsManager.addApplication(app.getAppId(), app.applicationACLs); // Inform the logAggregator app.logAggregationContext = initEvent.getLogAggregationContext(); + if (UserGroupInformation.isSecurityEnabled()) { + try { + app.manageDelegationToken(); + } catch (IOException | InterruptedException e) { + LOG.warn("Delegation token handling failed for " + app.appId, e); + } + } app.dispatcher.getEventHandler().handle( new LogHandlerAppStartedEvent(app.appId, app.user, app.credentials, app.applicationACLs, @@ -614,6 +633,8 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { app.context.getNMTokenSecretManager().appFinished(app.getAppId()); updateCollectorStatus(app); + + app.cancelManagedToken(); } } @@ -690,4 +711,76 @@ public long getFlowRunId() { public void setFlowContext(FlowContext fc) { this.flowContext = fc; } + + /** + * Checks, whether the HDFS Delegation Token is active. If the current + * token for the application is invalid, swap it with a new one, requested + * on behalf of the user. + * @throws IOException + * @throws InterruptedException + */ + private void manageDelegationToken() + throws IOException, InterruptedException { + UserGroupInformation proxyUser = + UserGroupInformation.createProxyUser(user, + UserGroupInformation.getLoginUser()); + + List> tokens = credentials.getAllTokens().stream() + .filter(token -> token.getKind().equals(HDFS_DELEGATION_KIND)) + .collect(Collectors.toList()); + if (tokens.isEmpty()) { + return; + } + + Token oldToken = tokens.get(0); + LOG.debug("Found token for " + appId + ": " + oldToken); + try { + UserGroupInformation.getLoginUser().doAs( + (PrivilegedExceptionAction) + () -> oldToken.renew(context.getConf())); + oldToken.renew(context.getConf()); + LOG.debug("Retrieved HDFS Delegation Token is successfully renewed"); + } catch (SecretManager.InvalidToken e) { + LOG.debug("HDFS Delegation Token is invalid"); + Token newToken = + proxyUser.doAs((PrivilegedExceptionAction>) () -> { + try (FileSystem fs = FileSystem.get(context.getConf())) { + Token tokenIssued = fs.getDelegationToken( + UserGroupInformation.getLoginUser().getUserName()); + if (tokenIssued == null || + !tokenIssued.getKind().equals(HDFS_DELEGATION_KIND)) { + return null; + } else { + LOG.debug("Requested a new HDFS Delegation Token: " + + tokenIssued + "for " + appId); + return tokenIssued; + } + } + }); + + if (newToken != null) { + credentials.addToken(oldToken.getService(), newToken); + managedToken = newToken; + } + } + } + + /** + * If the application has a self-managed token, cancel it on behalf of + * the user. + */ + private void cancelManagedToken() { + if (managedToken == null) { + return; + } + + try { + UserGroupInformation.getLoginUser().doAs((PrivilegedExceptionAction) + () -> { managedToken.cancel(context.getConf()); + return null; + }); + } catch (IOException | InterruptedException e) { + LOG.warn("Failed to cancel token " + managedToken, e); + } + } }