diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index d2888e7..bc7e006 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -43,10 +43,10 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; @@ -78,7 +78,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.RPCUtil; @@ -88,7 +87,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -160,9 +158,6 @@ protected void serviceStart() throws Exception { this.server.start(); clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS, server.getListenerAddress()); - // enable RM to short-circuit token operations directly to itself - RMDelegationTokenIdentifier.Renewer.setSecretManager( - rmDTSecretManager, clientBindAddress); super.serviceStart(); } @@ -626,4 +621,8 @@ private boolean isAllowedDelegationTokenOp() throws IOException { return true; } } + + public RMDelegationTokenSecretManager getDelegationTokenSecretManager() { + return this.rmDTSecretManager; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index c0b372a..54d531a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -169,11 +169,6 @@ protected void serviceInit(Configuration conf) throws Exception { AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor(); addService(amFinishingMonitor); - if (UserGroupInformation.isSecurityEnabled()) { - this.delegationTokenRenewer = createDelegationTokenRenewer(); - addService(delegationTokenRenewer); - } - this.containerTokenSecretManager = createContainerTokenSecretManager(conf); this.nmTokenSecretManager = createNMTokenSecretManager(conf); @@ -200,6 +195,10 @@ protected void serviceInit(Configuration conf) throws Exception { ExitUtil.terminate(1, e); } + if (UserGroupInformation.isSecurityEnabled()) { + this.delegationTokenRenewer = createDelegationTokenRenewer(); + } + this.rmContext = new RMContextImpl(this.rmDispatcher, rmStore, this.containerAllocationExpirer, amLivelinessMonitor, @@ -271,7 +270,10 @@ protected void serviceInit(Configuration conf) throws Exception { this.applicationMasterLauncher); addService(applicationMasterLauncher); - + if (UserGroupInformation.isSecurityEnabled()) { + addService(delegationTokenRenewer); + delegationTokenRenewer.setClientRMService(clientRM); + } new RMNMInfo(this.rmContext, this.scheduler); super.serviceInit(conf); @@ -620,13 +622,6 @@ protected void serviceStart() throws Exception { this.containerTokenSecretManager.start(); this.nmTokenSecretManager.start(); - // Explicitly start DTRenewer too in secure mode before kicking recovery as - // tokens will start getting added for renewal as part of the recovery - // process itself. - if (UserGroupInformation.isSecurityEnabled()) { - this.delegationTokenRenewer.start(); - } - RMStateStore rmStore = rmContext.getStateStore(); // The state store needs to start irrespective of recoveryEnabled as apps // need events to move to further states. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index bad5eab..ebbc4d4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -47,6 +48,8 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; import com.google.common.annotations.VisibleForTesting; @@ -64,6 +67,7 @@ // global single timer (daemon) private Timer renewalTimer; + private ClientRMService rmService; // delegation token canceler thread private DelegationTokenCancelThread dtCancelThread = @@ -80,6 +84,9 @@ private long tokenRemovalDelayMs; private Thread delayedRemovalThread; + private AtomicBoolean isServiceStarted = new AtomicBoolean(false); + private List pendingTokenForRenewal = + new ArrayList(); private boolean tokenKeepAliveEnabled; @@ -100,7 +107,6 @@ protected synchronized void serviceInit(Configuration conf) throws Exception { @Override protected void serviceStart() throws Exception { - dtCancelThread.start(); renewalTimer = new Timer(true); if (tokenKeepAliveEnabled) { @@ -109,6 +115,16 @@ protected void serviceStart() throws Exception { "DelayedTokenCanceller"); delayedRemovalThread.start(); } + // enable RM to short-circuit token operations directly to itself + RMDelegationTokenIdentifier.Renewer.setSecretManager( + rmService.getDelegationTokenSecretManager(), rmService.getBindAddress()); + // Delegation token renewal is delayed until ClientRMService starts. As + // it is required to short circuit the token renewal calls. + synchronized (isServiceStarted) { + isServiceStarted.set(true); + renewIfServiceIsStarted(pendingTokenForRenewal); + pendingTokenForRenewal.clear(); + } super.serviceStart(); } @@ -291,24 +307,41 @@ public void addApplication( // find tokens for renewal, but don't add timers until we know // all renewable tokens are valid - Set dtrs = new HashSet(); + // At RM restart it is safe to assume that all the previously added tokens + // are valid + List tokenList = + new ArrayList(); for(Token token : tokens) { - // first renew happens immediately if (token.isManaged()) { - DelegationTokenToRenew dtr = - new DelegationTokenToRenew(applicationId, token, getConfig(), now, - shouldCancelAtEnd); - renewToken(dtr); - dtrs.add(dtr); + tokenList.add(new DelegationTokenToRenew(applicationId, + token, getConfig(), now, shouldCancelAtEnd)); } } - for (DelegationTokenToRenew dtr : dtrs) { - addTokenToList(dtr); - setTimerForTokenRenewal(dtr); - if (LOG.isDebugEnabled()) { - LOG.debug("Registering token for renewal for:" + - " service = " + dtr.token.getService() + - " for appId = " + applicationId); + if (!tokenList.isEmpty()){ + renewIfServiceIsStarted(tokenList); + } + } + + protected void renewIfServiceIsStarted(List dtrs) + throws IOException { + synchronized (isServiceStarted) { + if (isServiceStarted.get()) { + // Renewing token and adding it to timer calls are separated purposefully + // If user provides incorrect token then it should not be added for + // renewal. + for (DelegationTokenToRenew dtr : dtrs) { + renewToken(dtr); + } + for (DelegationTokenToRenew dtr: dtrs) { + addTokenToList(dtr); + setTimerForTokenRenewal(dtr); + if (LOG.isDebugEnabled()) { + LOG.debug("Registering token for renewal for:" + " service = " + + dtr.token.getService() + " for appId = " + dtr.applicationId); + } + } + } else { + pendingTokenForRenewal.addAll(dtrs); } } } @@ -467,6 +500,10 @@ private void removeApplicationFromRenewal(ApplicationId applicationId) { } } } + + public void setClientRMService(ClientRMService rmService) { + this.rmService = rmService; + } /** * Takes care of cancelling app delegation tokens after the configured @@ -512,5 +549,4 @@ public void run() { } } } - } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index c0f480b..caa1fde 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; @@ -35,6 +34,7 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -90,7 +90,7 @@ public void setup() throws UnknownHostException { conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); - rmAddr = new InetSocketAddress(InetAddress.getLocalHost(), 123); + rmAddr = new InetSocketAddress("localhost", 8032); } @Test (timeout=180000) @@ -592,7 +592,12 @@ public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { @Test public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); - + + conf.set( + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + conf.set(YarnConfiguration.RM_ADDRESS, "localhost:8032"); + UserGroupInformation.setConfiguration(conf); MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); RMState rmState = memStore.getState(); @@ -614,6 +619,8 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { // request a token and add into credential GetDelegationTokenRequest request1 = GetDelegationTokenRequest.newInstance("renewer1"); + UserGroupInformation.getCurrentUser().setAuthenticationMethod( + AuthMethod.KERBEROS); GetDelegationTokenResponse response1 = rm1.getClientRMService().getDelegationToken(request1); org.apache.hadoop.yarn.api.records.Token delegationToken1 = @@ -644,7 +651,7 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { rm1.getRMDTSecretManager().getAllTokens(); Assert.assertEquals(tokenIdentSet, allTokensRM1.keySet()); Assert.assertEquals(allTokensRM1, rmDTState); - + // assert sequence number is saved Assert.assertEquals( rm1.getRMDTSecretManager().getLatestDTSequenceNumber(), @@ -682,7 +689,7 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { // assert master keys and tokens are populated back to DTSecretManager Map allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens(); - Assert.assertEquals(allTokensRM1, allTokensRM2); + Assert.assertEquals(allTokensRM2.keySet(), allTokensRM1.keySet()); // rm2 has its own master keys when it starts, we use containsAll here Assert.assertTrue(rm2.getRMDTSecretManager().getAllMasterKeys() .containsAll(allKeysRM1)); @@ -735,15 +742,25 @@ public TestSecurityMockRM(Configuration conf, RMStateStore store) { } @Override - protected void doSecureLogin() throws IOException { - // Do nothing. + protected ClientRMService createClientRMService() { + return new ClientRMService(getRMContext(), getResourceScheduler(), + rmAppManager, applicationACLsManager, rmDTSecretManager){ + @Override + protected void serviceStart() throws Exception { + RMDelegationTokenIdentifier.Renewer.setSecretManager( + rmDTSecretManager, rmAddr); + } + + @Override + protected void serviceStop() throws Exception { + //do nothing + } + }; } @Override - protected void serviceInit(Configuration conf) throws Exception { - super.serviceInit(conf); - RMDelegationTokenIdentifier.Renewer.setSecretManager( - this.getRMDTSecretManager(), rmAddr); + protected void doSecureLogin() throws IOException { + // Do nothing. } } }