diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 104ef4a..d92589c 100644
--- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -299,4 +299,10 @@
+
+
+
+
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index e661344..9495e44 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -282,30 +282,28 @@ protected void submitApplication(
this.applicationACLsManager.addApplication(applicationId,
submissionContext.getAMContainerSpec().getApplicationACLs());
- try {
- // Setup tokens for renewal
- if (UserGroupInformation.isSecurityEnabled()) {
- this.rmContext.getDelegationTokenRenewer().addApplication(
- applicationId,parseCredentials(submissionContext),
- submissionContext.getCancelTokensWhenComplete()
- );
+
+ if ( UserGroupInformation.isSecurityEnabled()) {
+ Credentials ts = null;
+ try {
+ ts = parseCredentials(submissionContext);
+ } catch (Throwable t) {
+ LOG.warn("Unable to parse credentials from submission context.", t);
+ // Sending APP_REJECTED is fine, since we assume that the
+ // RMApp is in NEW state and thus we havne't yet informed the
+ // Scheduler about the existence of the application
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppRejectedEvent(applicationId, t.getMessage()));
+ return;
}
- } catch (IOException ie) {
- LOG.warn(
- "Unable to add the application to the delegation token renewer.",
- ie);
- // Sending APP_REJECTED is fine, since we assume that the
- // RMApp is in NEW state and thus we havne't yet informed the
- // Scheduler about the existence of the application
+ this.rmContext.getDelegationTokenRenewer().addApplication(applicationId,
+ ts, submissionContext.getCancelTokensWhenComplete(), isRecovered);
+ } else {
+ // All done, start the RMApp
this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppRejectedEvent(applicationId, ie.getMessage()));
- throw RPCUtil.getRemoteException(ie);
+ new RMAppEvent(applicationId, isRecovered ? RMAppEventType.RECOVER:
+ RMAppEventType.START));
}
-
- // All done, start the RMApp
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppEvent(applicationId, isRecovered ? RMAppEventType.RECOVER:
- RMAppEventType.START));
}
private Credentials parseCredentials(ApplicationSubmissionContext application)
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
index a58b917..ad362e0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
@@ -34,7 +34,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -48,8 +47,13 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import com.google.common.annotations.VisibleForTesting;
@@ -72,6 +76,7 @@
// delegation token canceler thread
private DelegationTokenCancelThread dtCancelThread =
new DelegationTokenCancelThread();
+ private AsyncDispatcher dispatcher;
// managing the list of tokens using Map
// appId=>List
@@ -84,9 +89,6 @@
private long tokenRemovalDelayMs;
private Thread delayedRemovalThread;
- private boolean isServiceStarted = false;
- private List pendingTokenForRenewal =
- new ArrayList();
private boolean tokenKeepAliveEnabled;
@@ -102,6 +104,10 @@ protected synchronized void serviceInit(Configuration conf) throws Exception {
this.tokenRemovalDelayMs =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
+ this.dispatcher = new AsyncDispatcher();
+ this.dispatcher.init(conf);
+ this.dispatcher.register(DelegationTokenRenewerEventType.class,
+ new ForwardingEventHandler());
super.serviceInit(conf);
}
@@ -119,11 +125,7 @@ protected void serviceStart() throws Exception {
RMDelegationTokenIdentifier.Renewer.setSecretManager(
rmContext.getRMDelegationTokenSecretManager(),
rmContext.getClientRMService().getBindAddress());
- // Delegation token renewal is delayed until ClientRMService starts. As
- // it is required to short circuit the token renewal calls.
- isServiceStarted = true;
- renewIfServiceIsStarted(pendingTokenForRenewal);
- pendingTokenForRenewal.clear();
+ this.dispatcher.start();
super.serviceStart();
}
@@ -289,48 +291,53 @@ private void addTokenToList(DelegationTokenToRenew t) {
* done else false.
* @throws IOException
*/
+ @SuppressWarnings("unchecked")
public void addApplication(
- ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd)
+ ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd,
+ boolean isApplicationRecovered) {
+ dispatcher.getEventHandler().handle(
+ new DelegationTokenRenewerAppSubmitEvent(applicationId, ts,
+ shouldCancelAtEnd, isApplicationRecovered,
+ DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION));
+ }
+
+ private void addApplication(DelegationTokenRenewerAppSubmitEvent evt)
throws IOException {
+ ApplicationId applicationId = evt.getAppicationId();
+ Credentials ts = evt.getCredentials();
+ boolean shouldCancelAtEnd = evt.shouldCancelAtEnd();
if (ts == null) {
- return; //nothing to add
+ return; // nothing to add
}
-
+
if (LOG.isDebugEnabled()) {
- LOG.debug("Registering tokens for renewal for:" +
+ LOG.debug("Registering tokens for renewal for:" +
" appId = " + applicationId);
}
-
- Collection > tokens = ts.getAllTokens();
+
+ Collection> tokens = ts.getAllTokens();
long now = System.currentTimeMillis();
-
+
// find tokens for renewal, but don't add timers until we know
// all renewable tokens are valid
// At RM restart it is safe to assume that all the previously added tokens
// are valid
List tokenList =
new ArrayList();
- for(Token> token : tokens) {
+ for (Token> token : tokens) {
if (token.isManaged()) {
tokenList.add(new DelegationTokenToRenew(applicationId,
token, getConfig(), now, shouldCancelAtEnd));
}
}
- if (!tokenList.isEmpty()){
- renewIfServiceIsStarted(tokenList);
- }
- }
-
- protected void renewIfServiceIsStarted(List dtrs)
- throws IOException {
- if (isServiceStarted) {
+ if (!tokenList.isEmpty()) {
// Renewing token and adding it to timer calls are separated purposefully
// If user provides incorrect token then it should not be added for
// renewal.
- for (DelegationTokenToRenew dtr : dtrs) {
+ for (DelegationTokenToRenew dtr : tokenList) {
renewToken(dtr);
}
- for (DelegationTokenToRenew dtr : dtrs) {
+ for (DelegationTokenToRenew dtr : tokenList) {
addTokenToList(dtr);
setTimerForTokenRenewal(dtr);
if (LOG.isDebugEnabled()) {
@@ -338,11 +345,9 @@ protected void renewIfServiceIsStarted(List dtrs)
+ dtr.token.getService() + " for appId = " + dtr.applicationId);
}
}
- } else {
- pendingTokenForRenewal.addAll(dtrs);
}
}
-
+
/**
* Task - to renew a token
*
@@ -448,15 +453,22 @@ private void removeFailedDelegationToken(DelegationTokenToRenew t) {
* Removing delegation token for completed applications.
* @param applicationId completed application
*/
+ @SuppressWarnings("unchecked")
public void applicationFinished(ApplicationId applicationId) {
+ dispatcher.getEventHandler().handle(
+ new DelegationTokenRenewerEvent(applicationId,
+ DelegationTokenRenewerEventType.FINISH_APPLICATION));
+ }
+
+ private void applicationFinished(DelegationTokenRenewerEvent evt) {
if (!tokenKeepAliveEnabled) {
- removeApplicationFromRenewal(applicationId);
+ removeApplicationFromRenewal(evt.getAppicationId());
} else {
- delayedRemovalMap.put(applicationId, System.currentTimeMillis()
+ delayedRemovalMap.put(evt.getAppicationId(), System.currentTimeMillis()
+ tokenRemovalDelayMs);
}
}
-
+
/**
* Add a list of applications to the keep alive list. If an appId already
* exists, update it's keep-alive time.
@@ -546,4 +558,52 @@ public void run() {
public void setRMContext(RMContext rmContext) {
this.rmContext = rmContext;
}
+
+ /**
+ * EventHandler implementation which forward events to the
+ * DelegationTokenRenewer. This hides the EventHandler methods of the store
+ * from its public interface
+ */
+ private final class ForwardingEventHandler
+ implements EventHandler {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void handle(DelegationTokenRenewerEvent event) {
+ if (event.getType().equals(
+ DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION)) {
+ DelegationTokenRenewerAppSubmitEvent appSubmitEvt =
+ (DelegationTokenRenewerAppSubmitEvent) event;
+ handleDTRenewerEvent(appSubmitEvt);
+ } else if (event.getType().equals(
+ DelegationTokenRenewerEventType.FINISH_APPLICATION)) {
+ rmContext.getDelegationTokenRenewer().applicationFinished(event);
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ private void handleDTRenewerEvent(
+ DelegationTokenRenewerAppSubmitEvent event) {
+ try {
+ // Setup tokens for renewal
+ if (UserGroupInformation.isSecurityEnabled()) {
+ rmContext.getDelegationTokenRenewer().addApplication(event);
+ rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppEvent(event.getAppicationId(),
+ event.isApplicationRecovered() ? RMAppEventType.RECOVER
+ : RMAppEventType.START));
+ }
+ } catch (Throwable t) {
+ LOG.warn(
+ "Unable to add the application to the delegation token renewer.",
+ t);
+ // Sending APP_REJECTED is fine, since we assume that the
+ // RMApp is in NEW state and thus we havne't yet informed the
+ // Scheduler about the existence of the application
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppRejectedEvent(event.getAppicationId(), t.getMessage()));
+ }
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerAppSubmitEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerAppSubmitEvent.java
new file mode 100644
index 0000000..c3b0116
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerAppSubmitEvent.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.security;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+
+public class DelegationTokenRenewerAppSubmitEvent extends
+ DelegationTokenRenewerEvent {
+
+ private Credentials credentials;
+ private boolean shouldCancelAtEnd;
+ private boolean isAppRecovered;
+
+ public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId,
+ Credentials credentails, boolean shouldCancelAtEnd,
+ boolean isApplicationRecovered, DelegationTokenRenewerEventType type) {
+ super(appId, type);
+ this.credentials = credentails;
+ this.shouldCancelAtEnd = shouldCancelAtEnd;
+ this.isAppRecovered = isApplicationRecovered;
+ }
+
+ public Credentials getCredentials() {
+ return credentials;
+ }
+
+ public boolean shouldCancelAtEnd() {
+ return shouldCancelAtEnd;
+ }
+
+ public boolean isApplicationRecovered() {
+ return isAppRecovered;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerEvent.java
new file mode 100644
index 0000000..947b818
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.security;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class DelegationTokenRenewerEvent extends
+ AbstractEvent {
+
+ private ApplicationId appId;
+
+ public DelegationTokenRenewerEvent(ApplicationId appId,
+ DelegationTokenRenewerEventType type) {
+ super(type);
+ this.appId = appId;
+ }
+
+ public ApplicationId getAppicationId() {
+ return appId;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerEventType.java
new file mode 100644
index 0000000..6074be5
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerEventType.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.security;
+
+public enum DelegationTokenRenewerEventType {
+ VERIFY_AND_START_APPLICATION,
+ FINISH_APPLICATION
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index 81f4bce..35e5d17 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -495,6 +495,10 @@ public void testDelegationTokenRestoredInDelegationTokenRenewer()
MockRM rm2 = new TestSecurityMockRM(conf, memStore);
rm2.start();
+ // Need to wait for a while as now token renewal happens on another thread
+ // and is asynchronous in nature.
+ waitForTokensToBeRenewed(rm2);
+
// verify tokens are properly populated back to rm2 DelegationTokenRenewer
Assert.assertEquals(tokenSet, rm2.getRMContext()
.getDelegationTokenRenewer().getDelegationTokens());
@@ -504,6 +508,21 @@ public void testDelegationTokenRestoredInDelegationTokenRenewer()
rm2.stop();
}
+ private void waitForTokensToBeRenewed(MockRM rm2) throws Exception {
+ int waitCnt = 20;
+ boolean atleastOneAppInNEWState = true;
+ while (waitCnt-- > 0 && atleastOneAppInNEWState) {
+ atleastOneAppInNEWState = false;
+ for (RMApp rmApp : rm2.getRMContext().getRMApps().values()) {
+ if (rmApp.getState() == RMAppState.NEW) {
+ Thread.sleep(1000);
+ atleastOneAppInNEWState = true;
+ break;
+ }
+ }
+ }
+ }
+
@Test
public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
index 98e6ab0..fd36aa0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
@@ -21,9 +21,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -32,12 +29,15 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -46,34 +46,39 @@
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.security.token.delegation.DelegationKey;
-import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
/**
* unit test -
- * tests addition/deletion/cancelation of renewals of delegation tokens
+ * tests addition/deletion/cancellation of renewals of delegation tokens
*
*/
+@SuppressWarnings("rawtypes")
public class TestDelegationTokenRenewer {
private static final Log LOG =
LogFactory.getLog(TestDelegationTokenRenewer.class);
private static final Text KIND = new Text("TestDelegationTokenRenewer.Token");
+ private static BlockingQueue eventQueue;
+ private static AsyncDispatcher dispatcher;
public static class Renewer extends TokenRenewer {
private static int counter = 0;
private static Token> lastRenewed = null;
@@ -143,11 +148,19 @@ public static void setUpClass() throws Exception {
@Before
public void setUp() throws Exception {
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ eventQueue = new LinkedBlockingQueue();
+ dispatcher = new AsyncDispatcher(eventQueue);
Renewer.reset();
delegationTokenRenewer = new DelegationTokenRenewer();
delegationTokenRenewer.init(conf);
RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class);
+ when(mockContext.getDelegationTokenRenewer()).thenReturn(
+ delegationTokenRenewer);
+ when(mockContext.getDispatcher()).thenReturn(dispatcher);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
InetSocketAddress sockAddr =
InetSocketAddress.createUnresolved("localhost", 1234);
@@ -316,7 +329,7 @@ public void testDTRenewal () throws Exception {
// register the tokens for renewal
ApplicationId applicationId_0 =
BuilderUtils.newApplicationId(0, 0);
- delegationTokenRenewer.addApplication(applicationId_0, ts, true);
+ delegationTokenRenewer.addApplication(applicationId_0, ts, true, false);
// first 3 initial renewals + 1 real
int numberOfExpectedRenewals = 3+1;
@@ -355,9 +368,9 @@ public void testDTRenewal () throws Exception {
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
- delegationTokenRenewer.addApplication(applicationId_1, ts, true);
+ delegationTokenRenewer.addApplication(applicationId_1, ts, true, false);
delegationTokenRenewer.applicationFinished(applicationId_1);
-
+ waitForEventsToGetProcessed();
numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
try {
Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew
@@ -390,12 +403,21 @@ public void testInvalidDTWithAddApplication() throws Exception {
// register the tokens for renewal
ApplicationId appId = BuilderUtils.newApplicationId(0, 0);
- try {
- delegationTokenRenewer.addApplication(appId, ts, true);
- fail("App submission with a cancelled token should have failed");
- } catch (InvalidToken e) {
- // expected
+ delegationTokenRenewer.addApplication(appId, ts, true, false);
+ int waitCnt = 20;
+ while (waitCnt-- >0) {
+ if (!eventQueue.isEmpty()) {
+ Event evt = eventQueue.take();
+ if (evt.getType() == RMAppEventType.APP_REJECTED) {
+ Assert.assertTrue(
+ ((RMAppEvent) evt).getApplicationId().equals(appId));
+ return;
+ }
+ } else {
+ Thread.sleep(500);
+ }
}
+ fail("App submission with a cancelled token should have failed");
}
/**
@@ -425,9 +447,9 @@ public void testDTRenewalWithNoCancel () throws Exception {
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
- delegationTokenRenewer.addApplication(applicationId_1, ts, false);
+ delegationTokenRenewer.addApplication(applicationId_1, ts, false, false);
delegationTokenRenewer.applicationFinished(applicationId_1);
-
+ waitForEventsToGetProcessed();
int numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
try {
Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew
@@ -469,6 +491,9 @@ public void testDTKeepAlive1 () throws Exception {
RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
+ when(mockContext.getDelegationTokenRenewer()).thenReturn(
+ localDtr);
+ when(mockContext.getDispatcher()).thenReturn(dispatcher);
InetSocketAddress sockAddr =
InetSocketAddress.createUnresolved("localhost", 1234);
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
@@ -487,16 +512,15 @@ public void testDTKeepAlive1 () throws Exception {
// register the tokens for renewal
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
- localDtr.addApplication(applicationId_0, ts, true);
+ localDtr.addApplication(applicationId_0, ts, true, false);
localDtr.applicationFinished(applicationId_0);
-
- Thread.sleep(3000l);
+ waitForEventsToGetProcessed();
//Token should still be around. Renewal should not fail.
token1.renew(lconf);
//Allow the keepalive time to run out
- Thread.sleep(6000l);
+ Thread.sleep(10000l);
//The token should have been cancelled at this point. Renewal will fail.
try {
@@ -533,6 +557,9 @@ public void testDTKeepAlive2() throws Exception {
RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
+ when(mockContext.getDelegationTokenRenewer()).thenReturn(
+ localDtr);
+ when(mockContext.getDispatcher()).thenReturn(dispatcher);
InetSocketAddress sockAddr =
InetSocketAddress.createUnresolved("localhost", 1234);
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
@@ -551,22 +578,18 @@ public void testDTKeepAlive2() throws Exception {
// register the tokens for renewal
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
- localDtr.addApplication(applicationId_0, ts, true);
+ localDtr.addApplication(applicationId_0, ts, true, false);
localDtr.applicationFinished(applicationId_0);
-
- Thread.sleep(4000l);
-
+ waitForEventsToGetProcessed();
//Send another keep alive.
localDtr.updateKeepAliveApplications(Collections
.singletonList(applicationId_0));
//Renewal should not fail.
token1.renew(lconf);
-
//Token should be around after this.
Thread.sleep(4500l);
//Renewal should not fail. - ~1.5 seconds for keepalive timeout.
token1.renew(lconf);
-
//Allow the keepalive time to run out
Thread.sleep(3000l);
//The token should have been cancelled at this point. Renewal will fail.
@@ -575,61 +598,14 @@ public void testDTKeepAlive2() throws Exception {
fail("Renewal of cancelled token should have failed");
} catch (InvalidToken ite) {}
}
-
- @Test(timeout=20000)
- public void testConncurrentAddApplication()
- throws IOException, InterruptedException, BrokenBarrierException {
- final CyclicBarrier startBarrier = new CyclicBarrier(2);
- final CyclicBarrier endBarrier = new CyclicBarrier(2);
-
- // this token uses barriers to block during renew
- final Credentials creds1 = new Credentials();
- final Token> token1 = mock(Token.class);
- creds1.addToken(new Text("token"), token1);
- doReturn(true).when(token1).isManaged();
- doAnswer(new Answer() {
- public Long answer(InvocationOnMock invocation)
- throws InterruptedException, BrokenBarrierException {
- startBarrier.await();
- endBarrier.await();
- return Long.MAX_VALUE;
- }}).when(token1).renew(any(Configuration.class));
-
- // this dummy token fakes renewing
- final Credentials creds2 = new Credentials();
- final Token> token2 = mock(Token.class);
- creds2.addToken(new Text("token"), token2);
- doReturn(true).when(token2).isManaged();
- doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class));
-
- // fire up the renewer
- final DelegationTokenRenewer dtr = new DelegationTokenRenewer();
- dtr.init(conf);
- RMContext mockContext = mock(RMContext.class);
- ClientRMService mockClientRMService = mock(ClientRMService.class);
- when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
- InetSocketAddress sockAddr =
- InetSocketAddress.createUnresolved("localhost", 1234);
- when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
- dtr.setRMContext(mockContext);
- dtr.start();
-
- // submit a job that blocks during renewal
- Thread submitThread = new Thread() {
- @Override
- public void run() {
- try {
- dtr.addApplication(mock(ApplicationId.class), creds1, false);
- } catch (IOException e) {}
- }
- };
- submitThread.start();
-
- // wait till 1st submit blocks, then submit another
- startBarrier.await();
- dtr.addApplication(mock(ApplicationId.class), creds2, false);
- // signal 1st to complete
- endBarrier.await();
- submitThread.join();
+
+
+ private void waitForEventsToGetProcessed() throws InterruptedException {
+ int wait = 20;
+ while (wait-- >0 && !eventQueue.isEmpty()) {
+ Thread.sleep(1000);
+ }
+ Thread.sleep(1000l);
}
+
}