diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 6ca5307..d76e7ec 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -170,11 +170,6 @@ protected void serviceInit(Configuration conf) throws Exception { AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor(); addService(amFinishingMonitor); - if (UserGroupInformation.isSecurityEnabled()) { - this.delegationTokenRenewer = createDelegationTokenRenewer(); - addService(delegationTokenRenewer); - } - this.containerTokenSecretManager = createContainerTokenSecretManager(conf); this.nmTokenSecretManager = createNMTokenSecretManager(conf); @@ -201,6 +196,10 @@ protected void serviceInit(Configuration conf) throws Exception { ExitUtil.terminate(1, e); } + if (UserGroupInformation.isSecurityEnabled()) { + this.delegationTokenRenewer = createDelegationTokenRenewer(); + } + this.rmContext = new RMContextImpl(this.rmDispatcher, rmStore, this.containerAllocationExpirer, amLivelinessMonitor, @@ -272,7 +271,9 @@ protected void serviceInit(Configuration conf) throws Exception { this.applicationMasterLauncher); addService(applicationMasterLauncher); - + if (UserGroupInformation.isSecurityEnabled()) { + addService(delegationTokenRenewer); + } new RMNMInfo(this.rmContext, this.scheduler); super.serviceInit(conf); @@ -627,13 +628,6 @@ protected void serviceStart() throws Exception { this.containerTokenSecretManager.start(); this.nmTokenSecretManager.start(); - // Explicitly start DTRenewer too in secure mode before kicking recovery as - // tokens will start getting added for renewal as part of the recovery - // process itself. - if (UserGroupInformation.isSecurityEnabled()) { - this.delegationTokenRenewer.start(); - } - RMStateStore rmStore = rmContext.getStateStore(); // The state store needs to start irrespective of recoveryEnabled as apps // need events to move to further states. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index bad5eab..062594b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -80,6 +81,9 @@ private long tokenRemovalDelayMs; private Thread delayedRemovalThread; + private AtomicBoolean isServiceStarted = new AtomicBoolean(false); + private List pendingTokenForRenewal = + new ArrayList(); private boolean tokenKeepAliveEnabled; @@ -100,7 +104,6 @@ protected synchronized void serviceInit(Configuration conf) throws Exception { @Override protected void serviceStart() throws Exception { - dtCancelThread.start(); renewalTimer = new Timer(true); if (tokenKeepAliveEnabled) { @@ -109,6 +112,13 @@ protected void serviceStart() throws Exception { "DelayedTokenCanceller"); delayedRemovalThread.start(); } + synchronized (isServiceStarted) { + isServiceStarted.set(true); + for (DelegationTokenToRenew token : pendingTokenForRenewal) { + renewIfServiceIsStarted(token); + } + pendingTokenForRenewal.clear(); + } super.serviceStart(); } @@ -293,22 +303,27 @@ public void addApplication( // all renewable tokens are valid Set dtrs = new HashSet(); for(Token token : tokens) { - // first renew happens immediately if (token.isManaged()) { - DelegationTokenToRenew dtr = - new DelegationTokenToRenew(applicationId, token, getConfig(), now, - shouldCancelAtEnd); - renewToken(dtr); - dtrs.add(dtr); + renewIfServiceIsStarted(new DelegationTokenToRenew(applicationId, + token, getConfig(), now, shouldCancelAtEnd)); } } - for (DelegationTokenToRenew dtr : dtrs) { - addTokenToList(dtr); - setTimerForTokenRenewal(dtr); - if (LOG.isDebugEnabled()) { - LOG.debug("Registering token for renewal for:" + - " service = " + dtr.token.getService() + - " for appId = " + applicationId); + } + + protected void renewIfServiceIsStarted(DelegationTokenToRenew dtr) + throws IOException { + synchronized (isServiceStarted) { + if (isServiceStarted.get()) { + renewToken(dtr); + addTokenToList(dtr); + setTimerForTokenRenewal(dtr); + if (LOG.isDebugEnabled()) { + LOG.debug("Registering token for renewal for:" + + " service = " + dtr.token.getService() + + " for appId = " + dtr.applicationId); + } + } else { + pendingTokenForRenewal.add(dtr); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index c0f480b..caa1fde 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; @@ -35,6 +34,7 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -90,7 +90,7 @@ public void setup() throws UnknownHostException { conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); - rmAddr = new InetSocketAddress(InetAddress.getLocalHost(), 123); + rmAddr = new InetSocketAddress("localhost", 8032); } @Test (timeout=180000) @@ -592,7 +592,12 @@ public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { @Test public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); - + + conf.set( + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + conf.set(YarnConfiguration.RM_ADDRESS, "localhost:8032"); + UserGroupInformation.setConfiguration(conf); MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); RMState rmState = memStore.getState(); @@ -614,6 +619,8 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { // request a token and add into credential GetDelegationTokenRequest request1 = GetDelegationTokenRequest.newInstance("renewer1"); + UserGroupInformation.getCurrentUser().setAuthenticationMethod( + AuthMethod.KERBEROS); GetDelegationTokenResponse response1 = rm1.getClientRMService().getDelegationToken(request1); org.apache.hadoop.yarn.api.records.Token delegationToken1 = @@ -644,7 +651,7 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { rm1.getRMDTSecretManager().getAllTokens(); Assert.assertEquals(tokenIdentSet, allTokensRM1.keySet()); Assert.assertEquals(allTokensRM1, rmDTState); - + // assert sequence number is saved Assert.assertEquals( rm1.getRMDTSecretManager().getLatestDTSequenceNumber(), @@ -682,7 +689,7 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { // assert master keys and tokens are populated back to DTSecretManager Map allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens(); - Assert.assertEquals(allTokensRM1, allTokensRM2); + Assert.assertEquals(allTokensRM2.keySet(), allTokensRM1.keySet()); // rm2 has its own master keys when it starts, we use containsAll here Assert.assertTrue(rm2.getRMDTSecretManager().getAllMasterKeys() .containsAll(allKeysRM1)); @@ -735,15 +742,25 @@ public TestSecurityMockRM(Configuration conf, RMStateStore store) { } @Override - protected void doSecureLogin() throws IOException { - // Do nothing. + protected ClientRMService createClientRMService() { + return new ClientRMService(getRMContext(), getResourceScheduler(), + rmAppManager, applicationACLsManager, rmDTSecretManager){ + @Override + protected void serviceStart() throws Exception { + RMDelegationTokenIdentifier.Renewer.setSecretManager( + rmDTSecretManager, rmAddr); + } + + @Override + protected void serviceStop() throws Exception { + //do nothing + } + }; } @Override - protected void serviceInit(Configuration conf) throws Exception { - super.serviceInit(conf); - RMDelegationTokenIdentifier.Renewer.setSecretManager( - this.getRMDTSecretManager(), rmAddr); + protected void doSecureLogin() throws IOException { + // Do nothing. } } }