diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index e519982151f..c6ec223a30b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -19,14 +19,22 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.application; import java.io.IOException; +import java.util.ArrayList; import java.util.EnumSet; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.server.nodemanager.security.NMDelegationTokenManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,6 +83,9 @@ */ public class ApplicationImpl implements Application { + public static final Text HDFS_DELEGATION_KIND = + new Text("HDFS_DELEGATION_TOKEN"); + final Dispatcher dispatcher; final String user; // flow context is set only if the timeline service v.2 is enabled @@ -91,6 +102,8 @@ LoggerFactory.getLogger(ApplicationImpl.class); private LogAggregationContext logAggregationContext; + private List> managedTokens = new ArrayList<>(); + private NMDelegationTokenManager delegationTokenManager; Map containers = new ConcurrentHashMap<>(); @@ -130,6 +143,7 @@ public ApplicationImpl(Dispatcher dispatcher, String user, this.context = context; this.appStateStore = context.getNMStateStore(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + this.delegationTokenManager = new NMDelegationTokenManager(conf); readLock = lock.readLock(); writeLock = lock.writeLock(); stateMachine = stateMachineFactory.make(this); @@ -341,6 +355,13 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { app.aclsManager.addApplication(app.getAppId(), app.applicationACLs); // Inform the logAggregator app.logAggregationContext = initEvent.getLogAggregationContext(); + if (UserGroupInformation.isSecurityEnabled()) { + try { + app.manageDelegationToken(); + } catch (IOException | InterruptedException e) { + LOG.warn("Delegation token handling failed for " + app.appId, e); + } + } app.dispatcher.getEventHandler().handle( new LogHandlerAppStartedEvent(app.appId, app.user, app.credentials, app.applicationACLs, @@ -614,6 +635,8 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { app.context.getNMTokenSecretManager().appFinished(app.getAppId()); updateCollectorStatus(app); + + app.cancelManagedToken(); } } @@ -690,4 +713,56 @@ public long getFlowRunId() { public void setFlowContext(FlowContext fc) { this.flowContext = fc; } + + /** + * Checks, whether the HDFS Delegation Token is active. If the current + * token for the application is invalid, swap it with a new one, requested + * on behalf of the user. + * @throws IOException + * @throws InterruptedException + */ + private void manageDelegationToken() + throws IOException, InterruptedException { + List> tokens = credentials.getAllTokens().stream() + .filter(token -> token.getKind().equals(HDFS_DELEGATION_KIND)) + .collect(Collectors.toList()); + + for (Token oldToken : tokens) { + LOG.debug("Found token for " + appId + ": " + oldToken); + try { + delegationTokenManager.renewToken(oldToken); + LOG.info("Retrieved HDFS Delegation Token is successfully renewed "); + } catch (SecretManager.InvalidToken e) { + LOG.debug("HDFS Delegation Token is invalid"); + Token newToken = delegationTokenManager.requestNewToken(user); + + if (newToken != null && newToken.getKind().equals( + HDFS_DELEGATION_KIND)) { + LOG.info("Requested a new HDFS Delegation Token: " + + newToken + "for " + appId); + credentials.addToken(oldToken.getService(), newToken); + managedTokens.add(oldToken); + } + } + } + } + + /** + * If the application has a self-managed token, cancel it on behalf of + * the user. + */ + private void cancelManagedToken() { + for (Token managedToken : managedTokens) { + try { + delegationTokenManager.cancelToken(managedToken); + } catch (IOException | InterruptedException e) { + LOG.warn("Failed to cancel token " + managedToken, e); + } + } + } + + @VisibleForTesting + public void setDelegationTokenManager(NMDelegationTokenManager tokenManager) { + delegationTokenManager = tokenManager; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMDelegationTokenManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMDelegationTokenManager.java new file mode 100644 index 00000000000..e824c30d4e0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMDelegationTokenManager.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.security; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +/** + * Provides an API for Delegation Token management. + * The operations on the tokens must be executed on behalf of + * a user (either the user logged in to Kerberos or a proxy user). + */ +public class NMDelegationTokenManager { + private final Configuration conf; + + public NMDelegationTokenManager(Configuration conf) { + this.conf = conf; + } + + /** + * Renews a token on behalf of the user logged in. + * @param token Token to be renewed + * @return Exit code of the call + * @throws IOException + * @throws InterruptedException + */ + public Long renewToken(Token token) + throws IOException, InterruptedException { + return renewToken(token, null); + } + + /** + * Renews a token on behalf of the user logged in, or a proxy user, if not + * null. + * @param token Token to be renewed + * @param impersonatedUser Proxy user + * @return Exit code of the call + * @throws IOException + * @throws InterruptedException + */ + public Long renewToken(Token token, String impersonatedUser) + throws IOException, InterruptedException { + UserGroupInformation ugi = getUserGroupInformation(impersonatedUser); + + return ugi.doAs((PrivilegedExceptionAction) () -> token.renew(conf)); + } + + /** + * Requests a new token from the filesystem on behalf of the user logged in, + * or a proxy user, if not null. + * @param impersonatedUser Proxy user + * @return The new token from the filesystem + * @throws IOException + */ + public Token requestNewToken(String impersonatedUser) throws IOException { + UserGroupInformation ugi = getUserGroupInformation(impersonatedUser); + + try (FileSystem fs = FileSystem.get(conf)) { + return fs.getDelegationToken( + ugi.getUserName()); + } + } + + /** + * Cancels the token on behalf of the user logged in. + * @param token Token to be canceled + * @throws IOException + * @throws InterruptedException + */ + public void cancelToken(Token token) + throws IOException, InterruptedException { + cancelToken(token, null); + } + + /** + * Cancels a token on behalf of the user logged in, or a proxy user, if not + * null. + * @param token Token to be canceled + * @param impersonatedUser Proxy user + * @throws IOException + * @throws InterruptedException + */ + public void cancelToken( + Token token, String impersonatedUser) + throws IOException, InterruptedException { + UserGroupInformation ugi = getUserGroupInformation(impersonatedUser); + + ugi.doAs((PrivilegedExceptionAction) () -> {token.cancel(conf); + return null; + }); + } + + private UserGroupInformation getUserGroupInformation(String impersonatedUser) + throws IOException { + UserGroupInformation ugi; + + if (impersonatedUser != null) { + ugi = UserGroupInformation.createProxyUser(impersonatedUser, + UserGroupInformation.getLoginUser()); + } else { + ugi = UserGroupInformation.getLoginUser(); + } + + return ugi; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java index e3907f8d251..9f0c1ab523d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java @@ -28,6 +28,14 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -61,6 +69,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.nodemanager.security.NMDelegationTokenManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -489,6 +498,58 @@ public void testNMTokenSecretManagerCleanup() { } } + @Test + public void testAppInitWithManagedDelegationToken() { + WrappedApplication wa = new WrappedApplication( + 1, 314159265358979L, "yak", 1); + SecurityUtil.setAuthenticationMethod(UserGroupInformation + .AuthenticationMethod.KERBEROS, wa.context.getConf()); + UserGroupInformation.setConfiguration(wa.context.getConf()); + + DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(); + Text owner = new Text("test"); + dtId.setOwner(owner); + dtId.setRealUser(owner); + dtId.setRenewer(owner); + TestDT token = new TestDT(dtId); + ApplicationImpl app = (ApplicationImpl)wa.app; + app.credentials.addToken(owner, token); + wa.initApplication(); + wa.appFinished(); + // Assert whether token has been renewed if it was not cancelled + assertEquals(token.getStatus(), "RENEWED"); + wa.appResourcesCleanedup(); + // Assert whether token has not been canceled on cleanup + assertEquals(token.getStatus(), "RENEWED"); + + WrappedApplication wa2 = new WrappedApplication( + 2, 314159265358979L, "yak", 2); + ApplicationImpl app2 = (ApplicationImpl)wa2.app; + Text owner2 = new Text("test2"); + DelegationTokenIdentifier dtId2 = new DelegationTokenIdentifier(); + TestDT newToken = new TestDT(dtId); + TestDT newToken2 = new TestDT(dtId2); + + newToken.setID("NEW".getBytes()); + app2.setDelegationTokenManager( + new TestDTM(wa2.context.getConf(), newToken, newToken2)); + app2.credentials.addToken(owner, newToken); + app2.credentials.addToken(owner2, newToken2); + newToken.cancelToken(); + newToken2.cancelToken(); + Assert.assertTrue(newToken.isCanceled()); + Assert.assertTrue(newToken2.isCanceled()); + wa2.initApplication(); + Assert.assertEquals(newToken.getStatus(), "REQUESTED"); + Assert.assertEquals(newToken2.getStatus(), "REQUESTED"); + wa2.appFinished(); + wa2.appResourcesCleanedup(); + // Assert whether the new token was stored as a managed token and canceled on cleanup + Assert.assertTrue(newToken.isCanceled()); + Assert.assertTrue(newToken2.isCanceled()); + + } + private class ContainerKillMatcher implements ArgumentMatcher { private ContainerId cId; @@ -545,7 +606,7 @@ public boolean matches(ContainerEvent argument) { WrappedApplication(int id, long timestamp, String user, int numContainers) { Configuration conf = new Configuration(); - + dispatcher = new DrainDispatcher(); containerTokenIdentifierMap = new HashMap(); @@ -568,7 +629,7 @@ public boolean matches(ContainerEvent argument) { nmTokenSecretMgr = mock(NMTokenSecretManagerInNM.class); stateStoreService = mock(NMStateStoreService.class); context = mock(Context.class); - + when(context.getContainerTokenSecretManager()).thenReturn( new NMContainerTokenSecretManager(conf)); when(context.getApplicationACLsManager()).thenReturn( @@ -583,12 +644,12 @@ public boolean matches(ContainerEvent argument) { masterKey.setBytes(ByteBuffer.wrap(new byte[] { (new Integer(123) .byteValue()) })); context.getContainerTokenSecretManager().setMasterKey(masterKey); - + this.user = user; this.appId = BuilderUtils.newApplicationId(timestamp, id); app = new ApplicationImpl( - dispatcher, this.user, appId, null, context); + dispatcher, this.user, appId, new Credentials(), context); containers = new ArrayList(); for (int i = 0; i < numContainers; i++) { Container container = createMockedContainer(this.appId, i); @@ -684,4 +745,104 @@ private Container createMockedContainer(ApplicationId appId, int containerId) { ContainerState.NEW, "", 0, Resource.newInstance(1024, 1))); return c; } + + private static class TestDTM extends NMDelegationTokenManager { + private final List embeddedTokens; + + public TestDTM(Configuration conf, Token... embeddedToken) { + super(conf); + this.embeddedTokens = new ArrayList<>(); + for (Token t : embeddedToken) { + embeddedTokens.add((TestDT) t); + } + } + + + @Override + public Long renewToken(Token token, String impersonatedUser) throws IOException, InterruptedException { + TestDT testToken = (TestDT) token; + return token.renew(null); + } + + @Override + public Token requestNewToken(String impersonatedUser) + throws IOException { + for (TestDT t : embeddedTokens) { + if (t.isCanceled()) { + t.setStatus("REQUESTED"); + return t; + } + } + + return null; + } + } + + private static class TestDT extends Token { + public String status = "INIT"; + public static final String CANCELED = "CANCELED"; + + public TestDT(DelegationTokenIdentifier dtId1) { + super(dtId1, new SecretManager() { + @Override + protected byte[] createPassword(DelegationTokenIdentifier identifier) { + return new byte[0]; + } + + @Override + public byte[] retrievePassword(DelegationTokenIdentifier identifier) + throws InvalidToken { + return new byte[0]; + } + + @Override + public DelegationTokenIdentifier createIdentifier() { + return null; + } + }); + setKind(new Text("HDFS_DELEGATION_TOKEN")); + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public boolean isCanceled() {return status.equals(CANCELED);} + + public void cancelToken() {this.status=CANCELED;} + + @Override + public void cancel(Configuration conf) + throws IOException, InterruptedException { + status = CANCELED; + } + + @Override + public long renew(Configuration conf) throws IOException, + InterruptedException { + if (status.equals(CANCELED)) { + throw new SecretManager.InvalidToken("Status is canceled"); + } + status = "RENEWED"; + return 0L; + } + + public String toString() { + StringBuilder sb = new StringBuilder(1024); + + sb.append("id="); + String id = StringUtils.byteToHexString(this.getIdentifier()); + int idLen = id.length(); + sb.append(id.substring(idLen-6)); + sb.append(";k="); + sb.append(this.getKind()); + sb.append(";s="); + sb.append(this.getService()); + return sb.toString(); + } + } }