diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 104ef4a..d92589c 100644 --- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -299,4 +299,10 @@ + + + + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index e661344..9495e44 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -282,30 +282,28 @@ protected void submitApplication( this.applicationACLsManager.addApplication(applicationId, submissionContext.getAMContainerSpec().getApplicationACLs()); - try { - // Setup tokens for renewal - if (UserGroupInformation.isSecurityEnabled()) { - this.rmContext.getDelegationTokenRenewer().addApplication( - applicationId,parseCredentials(submissionContext), - submissionContext.getCancelTokensWhenComplete() - ); + + if ( UserGroupInformation.isSecurityEnabled()) { + Credentials ts = null; + try { + ts = parseCredentials(submissionContext); + } catch (Throwable t) { + LOG.warn("Unable to parse credentials from submission context.", t); + // Sending APP_REJECTED is fine, since we assume that the + // RMApp is in NEW state and thus we havne't yet informed the + // Scheduler about the existence of the application + rmContext.getDispatcher().getEventHandler().handle( + new RMAppRejectedEvent(applicationId, t.getMessage())); + return; } - } catch (IOException ie) { - LOG.warn( - "Unable to add the application to the delegation token renewer.", - ie); - // Sending APP_REJECTED is fine, since we assume that the - // RMApp is in NEW state and thus we havne't yet informed the - // Scheduler about the existence of the application + this.rmContext.getDelegationTokenRenewer().addApplication(applicationId, + ts, submissionContext.getCancelTokensWhenComplete(), isRecovered); + } else { + // All done, start the RMApp this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppRejectedEvent(applicationId, ie.getMessage())); - throw RPCUtil.getRemoteException(ie); + new RMAppEvent(applicationId, isRecovered ? RMAppEventType.RECOVER: + RMAppEventType.START)); } - - // All done, start the RMApp - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, isRecovered ? RMAppEventType.RECOVER: - RMAppEventType.START)); } private Credentials parseCredentials(ApplicationSubmissionContext application) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index a58b917..ad362e0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -48,8 +47,13 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; import com.google.common.annotations.VisibleForTesting; @@ -72,6 +76,7 @@ // delegation token canceler thread private DelegationTokenCancelThread dtCancelThread = new DelegationTokenCancelThread(); + private AsyncDispatcher dispatcher; // managing the list of tokens using Map // appId=>List @@ -84,9 +89,6 @@ private long tokenRemovalDelayMs; private Thread delayedRemovalThread; - private boolean isServiceStarted = false; - private List pendingTokenForRenewal = - new ArrayList(); private boolean tokenKeepAliveEnabled; @@ -102,6 +104,10 @@ protected synchronized void serviceInit(Configuration conf) throws Exception { this.tokenRemovalDelayMs = conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); + this.dispatcher = new AsyncDispatcher(); + this.dispatcher.init(conf); + this.dispatcher.register(DelegationTokenRenewerEventType.class, + new ForwardingEventHandler()); super.serviceInit(conf); } @@ -119,11 +125,7 @@ protected void serviceStart() throws Exception { RMDelegationTokenIdentifier.Renewer.setSecretManager( rmContext.getRMDelegationTokenSecretManager(), rmContext.getClientRMService().getBindAddress()); - // Delegation token renewal is delayed until ClientRMService starts. As - // it is required to short circuit the token renewal calls. - isServiceStarted = true; - renewIfServiceIsStarted(pendingTokenForRenewal); - pendingTokenForRenewal.clear(); + this.dispatcher.start(); super.serviceStart(); } @@ -289,48 +291,53 @@ private void addTokenToList(DelegationTokenToRenew t) { * done else false. * @throws IOException */ + @SuppressWarnings("unchecked") public void addApplication( - ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd) + ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd, + boolean isApplicationRecovered) { + dispatcher.getEventHandler().handle( + new DelegationTokenRenewerAppSubmitEvent(applicationId, ts, + shouldCancelAtEnd, isApplicationRecovered, + DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION)); + } + + private void addApplication(DelegationTokenRenewerAppSubmitEvent evt) throws IOException { + ApplicationId applicationId = evt.getAppicationId(); + Credentials ts = evt.getCredentials(); + boolean shouldCancelAtEnd = evt.shouldCancelAtEnd(); if (ts == null) { - return; //nothing to add + return; // nothing to add } - + if (LOG.isDebugEnabled()) { - LOG.debug("Registering tokens for renewal for:" + + LOG.debug("Registering tokens for renewal for:" + " appId = " + applicationId); } - - Collection > tokens = ts.getAllTokens(); + + Collection> tokens = ts.getAllTokens(); long now = System.currentTimeMillis(); - + // find tokens for renewal, but don't add timers until we know // all renewable tokens are valid // At RM restart it is safe to assume that all the previously added tokens // are valid List tokenList = new ArrayList(); - for(Token token : tokens) { + for (Token token : tokens) { if (token.isManaged()) { tokenList.add(new DelegationTokenToRenew(applicationId, token, getConfig(), now, shouldCancelAtEnd)); } } - if (!tokenList.isEmpty()){ - renewIfServiceIsStarted(tokenList); - } - } - - protected void renewIfServiceIsStarted(List dtrs) - throws IOException { - if (isServiceStarted) { + if (!tokenList.isEmpty()) { // Renewing token and adding it to timer calls are separated purposefully // If user provides incorrect token then it should not be added for // renewal. - for (DelegationTokenToRenew dtr : dtrs) { + for (DelegationTokenToRenew dtr : tokenList) { renewToken(dtr); } - for (DelegationTokenToRenew dtr : dtrs) { + for (DelegationTokenToRenew dtr : tokenList) { addTokenToList(dtr); setTimerForTokenRenewal(dtr); if (LOG.isDebugEnabled()) { @@ -338,11 +345,9 @@ protected void renewIfServiceIsStarted(List dtrs) + dtr.token.getService() + " for appId = " + dtr.applicationId); } } - } else { - pendingTokenForRenewal.addAll(dtrs); } } - + /** * Task - to renew a token * @@ -448,15 +453,22 @@ private void removeFailedDelegationToken(DelegationTokenToRenew t) { * Removing delegation token for completed applications. * @param applicationId completed application */ + @SuppressWarnings("unchecked") public void applicationFinished(ApplicationId applicationId) { + dispatcher.getEventHandler().handle( + new DelegationTokenRenewerEvent(applicationId, + DelegationTokenRenewerEventType.FINISH_APPLICATION)); + } + + private void applicationFinished(DelegationTokenRenewerEvent evt) { if (!tokenKeepAliveEnabled) { - removeApplicationFromRenewal(applicationId); + removeApplicationFromRenewal(evt.getAppicationId()); } else { - delayedRemovalMap.put(applicationId, System.currentTimeMillis() + delayedRemovalMap.put(evt.getAppicationId(), System.currentTimeMillis() + tokenRemovalDelayMs); } } - + /** * Add a list of applications to the keep alive list. If an appId already * exists, update it's keep-alive time. @@ -546,4 +558,52 @@ public void run() { public void setRMContext(RMContext rmContext) { this.rmContext = rmContext; } + + /** + * EventHandler implementation which forward events to the + * DelegationTokenRenewer. This hides the EventHandler methods of the store + * from its public interface + */ + private final class ForwardingEventHandler + implements EventHandler { + + @Override + @SuppressWarnings("unchecked") + public void handle(DelegationTokenRenewerEvent event) { + if (event.getType().equals( + DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION)) { + DelegationTokenRenewerAppSubmitEvent appSubmitEvt = + (DelegationTokenRenewerAppSubmitEvent) event; + handleDTRenewerEvent(appSubmitEvt); + } else if (event.getType().equals( + DelegationTokenRenewerEventType.FINISH_APPLICATION)) { + rmContext.getDelegationTokenRenewer().applicationFinished(event); + } + + } + + @SuppressWarnings("unchecked") + private void handleDTRenewerEvent( + DelegationTokenRenewerAppSubmitEvent event) { + try { + // Setup tokens for renewal + if (UserGroupInformation.isSecurityEnabled()) { + rmContext.getDelegationTokenRenewer().addApplication(event); + rmContext.getDispatcher().getEventHandler() + .handle(new RMAppEvent(event.getAppicationId(), + event.isApplicationRecovered() ? RMAppEventType.RECOVER + : RMAppEventType.START)); + } + } catch (Throwable t) { + LOG.warn( + "Unable to add the application to the delegation token renewer.", + t); + // Sending APP_REJECTED is fine, since we assume that the + // RMApp is in NEW state and thus we havne't yet informed the + // Scheduler about the existence of the application + rmContext.getDispatcher().getEventHandler().handle( + new RMAppRejectedEvent(event.getAppicationId(), t.getMessage())); + } + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerAppSubmitEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerAppSubmitEvent.java new file mode 100644 index 0000000..c3b0116 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerAppSubmitEvent.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.security; + +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationId; + + +public class DelegationTokenRenewerAppSubmitEvent extends + DelegationTokenRenewerEvent { + + private Credentials credentials; + private boolean shouldCancelAtEnd; + private boolean isAppRecovered; + + public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId, + Credentials credentails, boolean shouldCancelAtEnd, + boolean isApplicationRecovered, DelegationTokenRenewerEventType type) { + super(appId, type); + this.credentials = credentails; + this.shouldCancelAtEnd = shouldCancelAtEnd; + this.isAppRecovered = isApplicationRecovered; + } + + public Credentials getCredentials() { + return credentials; + } + + public boolean shouldCancelAtEnd() { + return shouldCancelAtEnd; + } + + public boolean isApplicationRecovered() { + return isAppRecovered; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerEvent.java new file mode 100644 index 0000000..947b818 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerEvent.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.security; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.event.AbstractEvent; + +public class DelegationTokenRenewerEvent extends + AbstractEvent { + + private ApplicationId appId; + + public DelegationTokenRenewerEvent(ApplicationId appId, + DelegationTokenRenewerEventType type) { + super(type); + this.appId = appId; + } + + public ApplicationId getAppicationId() { + return appId; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerEventType.java new file mode 100644 index 0000000..6074be5 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerEventType.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.security; + +public enum DelegationTokenRenewerEventType { + VERIFY_AND_START_APPLICATION, + FINISH_APPLICATION +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 81f4bce..35e5d17 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -495,6 +495,10 @@ public void testDelegationTokenRestoredInDelegationTokenRenewer() MockRM rm2 = new TestSecurityMockRM(conf, memStore); rm2.start(); + // Need to wait for a while as now token renewal happens on another thread + // and is asynchronous in nature. + waitForTokensToBeRenewed(rm2); + // verify tokens are properly populated back to rm2 DelegationTokenRenewer Assert.assertEquals(tokenSet, rm2.getRMContext() .getDelegationTokenRenewer().getDelegationTokens()); @@ -504,6 +508,21 @@ public void testDelegationTokenRestoredInDelegationTokenRenewer() rm2.stop(); } + private void waitForTokensToBeRenewed(MockRM rm2) throws Exception { + int waitCnt = 20; + boolean atleastOneAppInNEWState = true; + while (waitCnt-- > 0 && atleastOneAppInNEWState) { + atleastOneAppInNEWState = false; + for (RMApp rmApp : rm2.getRMContext().getRMApps().values()) { + if (rmApp.getState() == RMAppState.NEW) { + Thread.sleep(1000); + atleastOneAppInNEWState = true; + break; + } + } + } + } + @Test public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 98e6ab0..fd36aa0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -21,9 +21,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -32,12 +29,15 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.Collections; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -46,34 +46,39 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.security.token.delegation.DelegationKey; -import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; /** * unit test - - * tests addition/deletion/cancelation of renewals of delegation tokens + * tests addition/deletion/cancellation of renewals of delegation tokens * */ +@SuppressWarnings("rawtypes") public class TestDelegationTokenRenewer { private static final Log LOG = LogFactory.getLog(TestDelegationTokenRenewer.class); private static final Text KIND = new Text("TestDelegationTokenRenewer.Token"); + private static BlockingQueue eventQueue; + private static AsyncDispatcher dispatcher; public static class Renewer extends TokenRenewer { private static int counter = 0; private static Token lastRenewed = null; @@ -143,11 +148,19 @@ public static void setUpClass() throws Exception { @Before public void setUp() throws Exception { + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + eventQueue = new LinkedBlockingQueue(); + dispatcher = new AsyncDispatcher(eventQueue); Renewer.reset(); delegationTokenRenewer = new DelegationTokenRenewer(); delegationTokenRenewer.init(conf); RMContext mockContext = mock(RMContext.class); ClientRMService mockClientRMService = mock(ClientRMService.class); + when(mockContext.getDelegationTokenRenewer()).thenReturn( + delegationTokenRenewer); + when(mockContext.getDispatcher()).thenReturn(dispatcher); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); InetSocketAddress sockAddr = InetSocketAddress.createUnresolved("localhost", 1234); @@ -316,7 +329,7 @@ public void testDTRenewal () throws Exception { // register the tokens for renewal ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0); - delegationTokenRenewer.addApplication(applicationId_0, ts, true); + delegationTokenRenewer.addApplication(applicationId_0, ts, true, false); // first 3 initial renewals + 1 real int numberOfExpectedRenewals = 3+1; @@ -355,9 +368,9 @@ public void testDTRenewal () throws Exception { ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1); - delegationTokenRenewer.addApplication(applicationId_1, ts, true); + delegationTokenRenewer.addApplication(applicationId_1, ts, true, false); delegationTokenRenewer.applicationFinished(applicationId_1); - + waitForEventsToGetProcessed(); numberOfExpectedRenewals = Renewer.counter; // number of renewals so far try { Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew @@ -390,12 +403,21 @@ public void testInvalidDTWithAddApplication() throws Exception { // register the tokens for renewal ApplicationId appId = BuilderUtils.newApplicationId(0, 0); - try { - delegationTokenRenewer.addApplication(appId, ts, true); - fail("App submission with a cancelled token should have failed"); - } catch (InvalidToken e) { - // expected + delegationTokenRenewer.addApplication(appId, ts, true, false); + int waitCnt = 20; + while (waitCnt-- >0) { + if (!eventQueue.isEmpty()) { + Event evt = eventQueue.take(); + if (evt.getType() == RMAppEventType.APP_REJECTED) { + Assert.assertTrue( + ((RMAppEvent) evt).getApplicationId().equals(appId)); + return; + } + } else { + Thread.sleep(500); + } } + fail("App submission with a cancelled token should have failed"); } /** @@ -425,9 +447,9 @@ public void testDTRenewalWithNoCancel () throws Exception { ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1); - delegationTokenRenewer.addApplication(applicationId_1, ts, false); + delegationTokenRenewer.addApplication(applicationId_1, ts, false, false); delegationTokenRenewer.applicationFinished(applicationId_1); - + waitForEventsToGetProcessed(); int numberOfExpectedRenewals = Renewer.counter; // number of renewals so far try { Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew @@ -469,6 +491,9 @@ public void testDTKeepAlive1 () throws Exception { RMContext mockContext = mock(RMContext.class); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); + when(mockContext.getDelegationTokenRenewer()).thenReturn( + localDtr); + when(mockContext.getDispatcher()).thenReturn(dispatcher); InetSocketAddress sockAddr = InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); @@ -487,16 +512,15 @@ public void testDTKeepAlive1 () throws Exception { // register the tokens for renewal ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0); - localDtr.addApplication(applicationId_0, ts, true); + localDtr.addApplication(applicationId_0, ts, true, false); localDtr.applicationFinished(applicationId_0); - - Thread.sleep(3000l); + waitForEventsToGetProcessed(); //Token should still be around. Renewal should not fail. token1.renew(lconf); //Allow the keepalive time to run out - Thread.sleep(6000l); + Thread.sleep(10000l); //The token should have been cancelled at this point. Renewal will fail. try { @@ -533,6 +557,9 @@ public void testDTKeepAlive2() throws Exception { RMContext mockContext = mock(RMContext.class); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); + when(mockContext.getDelegationTokenRenewer()).thenReturn( + localDtr); + when(mockContext.getDispatcher()).thenReturn(dispatcher); InetSocketAddress sockAddr = InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); @@ -551,22 +578,18 @@ public void testDTKeepAlive2() throws Exception { // register the tokens for renewal ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0); - localDtr.addApplication(applicationId_0, ts, true); + localDtr.addApplication(applicationId_0, ts, true, false); localDtr.applicationFinished(applicationId_0); - - Thread.sleep(4000l); - + waitForEventsToGetProcessed(); //Send another keep alive. localDtr.updateKeepAliveApplications(Collections .singletonList(applicationId_0)); //Renewal should not fail. token1.renew(lconf); - //Token should be around after this. Thread.sleep(4500l); //Renewal should not fail. - ~1.5 seconds for keepalive timeout. token1.renew(lconf); - //Allow the keepalive time to run out Thread.sleep(3000l); //The token should have been cancelled at this point. Renewal will fail. @@ -575,61 +598,14 @@ public void testDTKeepAlive2() throws Exception { fail("Renewal of cancelled token should have failed"); } catch (InvalidToken ite) {} } - - @Test(timeout=20000) - public void testConncurrentAddApplication() - throws IOException, InterruptedException, BrokenBarrierException { - final CyclicBarrier startBarrier = new CyclicBarrier(2); - final CyclicBarrier endBarrier = new CyclicBarrier(2); - - // this token uses barriers to block during renew - final Credentials creds1 = new Credentials(); - final Token token1 = mock(Token.class); - creds1.addToken(new Text("token"), token1); - doReturn(true).when(token1).isManaged(); - doAnswer(new Answer() { - public Long answer(InvocationOnMock invocation) - throws InterruptedException, BrokenBarrierException { - startBarrier.await(); - endBarrier.await(); - return Long.MAX_VALUE; - }}).when(token1).renew(any(Configuration.class)); - - // this dummy token fakes renewing - final Credentials creds2 = new Credentials(); - final Token token2 = mock(Token.class); - creds2.addToken(new Text("token"), token2); - doReturn(true).when(token2).isManaged(); - doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class)); - - // fire up the renewer - final DelegationTokenRenewer dtr = new DelegationTokenRenewer(); - dtr.init(conf); - RMContext mockContext = mock(RMContext.class); - ClientRMService mockClientRMService = mock(ClientRMService.class); - when(mockContext.getClientRMService()).thenReturn(mockClientRMService); - InetSocketAddress sockAddr = - InetSocketAddress.createUnresolved("localhost", 1234); - when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); - dtr.setRMContext(mockContext); - dtr.start(); - - // submit a job that blocks during renewal - Thread submitThread = new Thread() { - @Override - public void run() { - try { - dtr.addApplication(mock(ApplicationId.class), creds1, false); - } catch (IOException e) {} - } - }; - submitThread.start(); - - // wait till 1st submit blocks, then submit another - startBarrier.await(); - dtr.addApplication(mock(ApplicationId.class), creds2, false); - // signal 1st to complete - endBarrier.await(); - submitThread.join(); + + + private void waitForEventsToGetProcessed() throws InterruptedException { + int wait = 20; + while (wait-- >0 && !eventQueue.isEmpty()) { + Thread.sleep(1000); + } + Thread.sleep(1000l); } + }