diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index e519982151f..e238af359ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -20,13 +20,20 @@ import java.io.IOException; import java.util.EnumSet; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.server.nodemanager.security.NMDelegationTokenManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,6 +82,9 @@ */ public class ApplicationImpl implements Application { + public static final Text HDFS_DELEGATION_KIND = + new Text("HDFS_DELEGATION_TOKEN"); + final Dispatcher dispatcher; final String user; // flow context is set only if the timeline service v.2 is enabled @@ -91,6 +101,8 @@ LoggerFactory.getLogger(ApplicationImpl.class); private LogAggregationContext logAggregationContext; + private Token managedToken; + private NMDelegationTokenManager delegationTokenManager; Map containers = new ConcurrentHashMap<>(); @@ -130,6 +142,7 @@ public ApplicationImpl(Dispatcher dispatcher, String user, this.context = context; this.appStateStore = context.getNMStateStore(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + this.delegationTokenManager = new NMDelegationTokenManager(conf); readLock = lock.readLock(); writeLock = lock.writeLock(); stateMachine = stateMachineFactory.make(this); @@ -341,6 +354,13 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { app.aclsManager.addApplication(app.getAppId(), app.applicationACLs); // Inform the logAggregator app.logAggregationContext = initEvent.getLogAggregationContext(); + if (UserGroupInformation.isSecurityEnabled()) { + try { + app.manageDelegationToken(); + } catch (IOException | InterruptedException e) { + LOG.warn("Delegation token handling failed for " + app.appId, e); + } + } app.dispatcher.getEventHandler().handle( new LogHandlerAppStartedEvent(app.appId, app.user, app.credentials, app.applicationACLs, @@ -614,6 +634,8 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { app.context.getNMTokenSecretManager().appFinished(app.getAppId()); updateCollectorStatus(app); + + app.cancelManagedToken(); } } @@ -690,4 +712,58 @@ public long getFlowRunId() { public void setFlowContext(FlowContext fc) { this.flowContext = fc; } + + /** + * Checks, whether the HDFS Delegation Token is active. If the current + * token for the application is invalid, swap it with a new one, requested + * on behalf of the user. + * @throws IOException + * @throws InterruptedException + */ + private void manageDelegationToken() + throws IOException, InterruptedException { + List> tokens = credentials.getAllTokens().stream() + .filter(token -> token.getKind().equals(HDFS_DELEGATION_KIND)) + .collect(Collectors.toList()); + if (tokens.isEmpty()) { + return; + } + + Token oldToken = tokens.get(0); + LOG.debug("Found token for " + appId + ": " + oldToken); + try { + delegationTokenManager.renewToken(oldToken); + LOG.debug("Retrieved HDFS Delegation Token is successfully renewed"); + } catch (SecretManager.InvalidToken e) { + LOG.debug("HDFS Delegation Token is invalid"); + Token newToken = delegationTokenManager.requestNewToken(user); + + if (newToken != null && newToken.getKind().equals(HDFS_DELEGATION_KIND)) { + LOG.debug("Requested a new HDFS Delegation Token: " + + newToken + "for " + appId); + credentials.addToken(oldToken.getService(), newToken); + managedToken = newToken; + } + } + } + + /** + * If the application has a self-managed token, cancel it on behalf of + * the user. + */ + private void cancelManagedToken() { + if (managedToken == null) { + return; + } + try { + delegationTokenManager.cancelToken(managedToken); + } catch (IOException | InterruptedException e) { + LOG.warn("Failed to cancel token " + managedToken, e); + } + } + + @VisibleForTesting + public void setDelegationTokenManager(NMDelegationTokenManager tokenManager) { + delegationTokenManager = tokenManager; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java index e3907f8d251..17687667460 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java @@ -28,6 +28,14 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -61,6 +69,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.nodemanager.security.NMDelegationTokenManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -489,6 +498,46 @@ public void testNMTokenSecretManagerCleanup() { } } + @Test + public void testAppInitWithManagedDelegationToken() { + WrappedApplication wa = new WrappedApplication( + 1, 314159265358979L, "yak", 1); + SecurityUtil.setAuthenticationMethod(UserGroupInformation + .AuthenticationMethod.KERBEROS, wa.context.getConf()); + UserGroupInformation.setConfiguration(wa.context.getConf()); + + DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(); + Text owner = new Text("test"); + dtId.setOwner(owner); + dtId.setRealUser(owner); + dtId.setRenewer(owner); + TestDT token = new TestDT(dtId); + ApplicationImpl app = (ApplicationImpl)wa.app; + app.credentials.addToken(owner, token); + wa.initApplication(); + wa.appFinished(); + // Assert whether token has been renewed if it was not cancelled + assertEquals(token.status, "RENEWED"); + wa.appResourcesCleanedup(); + // Assert whether token has not been canceled on cleanup + assertEquals(token.status, "RENEWED"); + + WrappedApplication wa2 = new WrappedApplication( + 2, 314159265358979L, "yak", 2); + ApplicationImpl app2 = (ApplicationImpl)wa2.app; + TestDT newToken = new TestDT(dtId); + newToken.setID("NEW".getBytes()); + app2.setDelegationTokenManager(new TestDTM(wa2.context.getConf(), newToken)); + app2.credentials.addToken(owner, token); + token.cancelToken(); + wa2.initApplication(); + wa2.appFinished(); + wa2.appResourcesCleanedup(); + // Assert whether the new token was stored as a managed token and canceled on cleanup + Assert.assertTrue(newToken.isCanceled()); + + } + private class ContainerKillMatcher implements ArgumentMatcher { private ContainerId cId; @@ -545,7 +594,7 @@ public boolean matches(ContainerEvent argument) { WrappedApplication(int id, long timestamp, String user, int numContainers) { Configuration conf = new Configuration(); - + dispatcher = new DrainDispatcher(); containerTokenIdentifierMap = new HashMap(); @@ -568,7 +617,7 @@ public boolean matches(ContainerEvent argument) { nmTokenSecretMgr = mock(NMTokenSecretManagerInNM.class); stateStoreService = mock(NMStateStoreService.class); context = mock(Context.class); - + when(context.getContainerTokenSecretManager()).thenReturn( new NMContainerTokenSecretManager(conf)); when(context.getApplicationACLsManager()).thenReturn( @@ -583,12 +632,12 @@ public boolean matches(ContainerEvent argument) { masterKey.setBytes(ByteBuffer.wrap(new byte[] { (new Integer(123) .byteValue()) })); context.getContainerTokenSecretManager().setMasterKey(masterKey); - + this.user = user; this.appId = BuilderUtils.newApplicationId(timestamp, id); app = new ApplicationImpl( - dispatcher, this.user, appId, null, context); + dispatcher, this.user, appId, new Credentials(), context); containers = new ArrayList(); for (int i = 0; i < numContainers; i++) { Container container = createMockedContainer(this.appId, i); @@ -684,4 +733,79 @@ private Container createMockedContainer(ApplicationId appId, int containerId) { ContainerState.NEW, "", 0, Resource.newInstance(1024, 1))); return c; } + + private static class TestDTM extends NMDelegationTokenManager { + private final Token token; + + public TestDTM(Configuration conf, Token token) { + super(conf); + this.token = token; + } + + @Override + public Token requestNewToken(String impersonatedUser) + throws IOException { + return token; + } + } + + private static class TestDT extends Token { + public String status = "INIT"; + public static final String CANCELED = "CANCELED"; + + public TestDT(DelegationTokenIdentifier dtId1) { + super(dtId1, new SecretManager() { + @Override + protected byte[] createPassword(DelegationTokenIdentifier identifier) { + return new byte[0]; + } + + @Override + public byte[] retrievePassword(DelegationTokenIdentifier identifier) + throws InvalidToken { + return new byte[0]; + } + + @Override + public DelegationTokenIdentifier createIdentifier() { + return null; + } + }); + setKind(new Text("HDFS_DELEGATION_TOKEN")); + } + + public boolean isCanceled() {return status.equals(CANCELED);} + + public void cancelToken() {this.status=CANCELED;} + + @Override + public void cancel(Configuration conf) + throws IOException, InterruptedException { + status = CANCELED; + } + + @Override + public long renew(Configuration conf) throws IOException, + InterruptedException { + if (status.equals(CANCELED)) { + throw new SecretManager.InvalidToken("Status is canceled"); + } + status = "RENEWED"; + return 0L; + } + + public String toString() { + StringBuilder sb = new StringBuilder(1024); + + sb.append("id="); + String id = StringUtils.byteToHexString(this.getIdentifier()); + int idLen = id.length(); + sb.append(id.substring(idLen-6)); + sb.append(";k="); + sb.append(this.getKind()); + sb.append(";s="); + sb.append(this.getService()); + return sb.toString(); + } + } }