diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 104ef4a..d92589c 100644
--- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -299,4 +299,10 @@
+
+
+
+
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 09f6b6e..078f827 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -489,6 +489,11 @@
RM_PREFIX + "delayed.delegation-token.removal-interval-ms";
public static final long DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
30000l;
+
+ /** Delegation Token renewer thread count */
+ public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT =
+ RM_PREFIX + "delegation-token-renewer.thread-count";
+ public static final int DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = 5;
/** Whether to enable log aggregation */
public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index 90c88c2..99c73a1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -317,7 +317,7 @@ public SubmitApplicationResponse submitApplication(
try {
// call RMAppManager to submit application directly
rmAppManager.submitApplication(submissionContext,
- System.currentTimeMillis(), false, user);
+ System.currentTimeMillis(), user);
LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 55b748d..1d6f04d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -47,6 +47,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@@ -232,32 +233,94 @@ protected synchronized void checkAppNumCompletedLimit() {
@SuppressWarnings("unchecked")
protected void submitApplication(
+ ApplicationSubmissionContext submissionContext,
+ long submitTime, String user) throws YarnException {
+ ApplicationId applicationId = submissionContext.getApplicationId();
+ validateResourceRequest(submissionContext);
+ createAndPopulateNewRMApp(submissionContext, submitTime, user,
+ applicationId);
+
+ // Inform the ACLs Manager
+ this.applicationACLsManager.addApplication(applicationId,
+ submissionContext.getAMContainerSpec().getApplicationACLs());
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ Credentials credentials = null;
+ try {
+ credentials = parseCredentials(submissionContext);
+ } catch (Exception e) {
+ LOG.warn(
+ "Unable to parse credentials.", e);
+ // Sending APP_REJECTED is fine, since we assume that the
+ // RMApp is in NEW state and thus we havne't yet informed the
+ // Scheduler about the existence of the application
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppRejectedEvent(applicationId, e.getMessage()));
+ throw RPCUtil.getRemoteException(e);
+ }
+ this.rmContext.getDelegationTokenRenewer().addApplication(
+ applicationId, credentials,
+ submissionContext.getCancelTokensWhenComplete(), false);
+ } else {
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.START));
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void submitRecoveredApplication(
ApplicationSubmissionContext submissionContext, long submitTime,
- boolean isRecovered, String user) throws YarnException {
+ String user, RMState state) throws YarnException {
ApplicationId applicationId = submissionContext.getApplicationId();
- // Validation of the ApplicationSubmissionContext needs to be completed
- // here. Only those fields that are dependent on RM's configuration are
- // checked here as they have to be validated whether they are part of new
- // submission or just being recovered.
+ validateResourceRequest(submissionContext);
+ RMAppImpl application =
+ createAndPopulateNewRMApp(submissionContext, submitTime, user,
+ applicationId);
+ // Inform the ACLs Manager
+ this.applicationACLsManager.addApplication(applicationId,
+ submissionContext.getAMContainerSpec().getApplicationACLs());
- // Check whether AM resource requirements are within required limits
- if (!submissionContext.getUnmanagedAM()) {
- ResourceRequest amReq = BuilderUtils.newResourceRequest(
- RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
- submissionContext.getResource(), 1);
+ recoverApplication(state, application);
+ RMAppState rmAppState =
+ state.getApplicationState().get(applicationId).getState();
+ if (isApplicationInFinalState(rmAppState)) {
+ // don't recover finished applications
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppEvent(applicationId, RMAppEventType.RECOVER));
+ return;
+ }
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ Credentials credentials = null;
try {
- SchedulerUtils.validateResourceRequest(amReq,
- scheduler.getMaximumResourceCapability());
- } catch (InvalidResourceRequestException e) {
- LOG.warn("RM app submission failed in validating AM resource request"
- + " for application " + applicationId, e);
- throw e;
+ credentials = parseCredentials(submissionContext);
+ } catch (Exception e) {
+ LOG.warn(
+ "Unable to parse credentials.", e);
+ // Sending APP_REJECTED is fine, since we assume that the
+ // RMApp is in NEW state and thus we havne't yet informed the
+ // Scheduler about the existence of the application
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppRejectedEvent(applicationId, e.getMessage()));
+ throw RPCUtil.getRemoteException(e);
}
+ this.rmContext.getDelegationTokenRenewer().addApplication(
+ applicationId, credentials,
+ submissionContext.getCancelTokensWhenComplete(), true);
+ } else {
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.RECOVER));
}
+ }
+ private RMAppImpl createAndPopulateNewRMApp(
+ ApplicationSubmissionContext submissionContext,
+ long submitTime, String user, ApplicationId applicationId)
+ throws YarnException {
// Create RMApp
- RMApp application =
+ RMAppImpl application =
new RMAppImpl(applicationId, rmContext, this.conf,
submissionContext.getApplicationName(), user,
submissionContext.getQueue(),
@@ -274,35 +337,49 @@ protected void submitApplication(
LOG.warn(message);
throw RPCUtil.getRemoteException(message);
}
+ return application;
+ }
- // Inform the ACLs Manager
- this.applicationACLsManager.addApplication(applicationId,
- submissionContext.getAMContainerSpec().getApplicationACLs());
+ private void validateResourceRequest(
+ ApplicationSubmissionContext submissionContext)
+ throws InvalidResourceRequestException {
+ // Validation of the ApplicationSubmissionContext needs to be completed
+ // here. Only those fields that are dependent on RM's configuration are
+ // checked here as they have to be validated whether they are part of new
+ // submission or just being recovered.
- try {
- // Setup tokens for renewal
- if (UserGroupInformation.isSecurityEnabled()) {
- this.rmContext.getDelegationTokenRenewer().addApplication(
- applicationId,parseCredentials(submissionContext),
- submissionContext.getCancelTokensWhenComplete()
- );
+ // Check whether AM resource requirements are within required limits
+ if (!submissionContext.getUnmanagedAM()) {
+ ResourceRequest amReq = BuilderUtils.newResourceRequest(
+ RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
+ submissionContext.getResource(), 1);
+ try {
+ SchedulerUtils.validateResourceRequest(amReq,
+ scheduler.getMaximumResourceCapability());
+ } catch (InvalidResourceRequestException e) {
+ LOG.warn("RM app submission failed in validating AM resource request"
+ + " for application " + submissionContext.getApplicationId(), e);
+ throw e;
}
- } catch (IOException ie) {
- LOG.warn(
- "Unable to add the application to the delegation token renewer.",
- ie);
- // Sending APP_REJECTED is fine, since we assume that the
- // RMApp is in NEW state and thus we havne't yet informed the
- // Scheduler about the existence of the application
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppRejectedEvent(applicationId, ie.getMessage()));
- throw RPCUtil.getRemoteException(ie);
}
+ }
- if (!isRecovered) {
- // All done, start the RMApp
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId, RMAppEventType.START));
+ private void recoverApplication(RMState state, RMAppImpl application)
+ throws YarnException {
+ try {
+ application.recover(state);
+ } catch (Exception e) {
+ LOG.error("Error recovering application", e);
+ throw new YarnException(e);
+ }
+ }
+
+ private boolean isApplicationInFinalState(RMAppState rmAppState) {
+ if (rmAppState == RMAppState.FINISHED || rmAppState == RMAppState.FAILED
+ || rmAppState == RMAppState.KILLED) {
+ return true;
+ } else {
+ return false;
}
}
@@ -328,17 +405,9 @@ public void recover(RMState state) throws Exception {
LOG.info("Recovering " + appStates.size() + " applications");
for (ApplicationState appState : appStates.values()) {
LOG.info("Recovering application " + appState.getAppId());
- submitApplication(appState.getApplicationSubmissionContext(),
- appState.getSubmitTime(), true, appState.getUser());
- // re-populate attempt information in application
- RMAppImpl appImpl =
- (RMAppImpl) rmContext.getRMApps().get(appState.getAppId());
- appImpl.recover(state);
- // Recover the app synchronously, as otherwise client is possible to see
- // the application not recovered before it is actually recovered because
- // ClientRMService is already started at this point of time.
- appImpl.handle(new RMAppEvent(appImpl.getApplicationId(),
- RMAppEventType.RECOVER));
+
+ submitRecoveredApplication(appState.getApplicationSubmissionContext(),
+ appState.getSubmitTime(), appState.getUser(), state);
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
index a58b917..eb0fabe 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
@@ -33,8 +33,11 @@
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -50,8 +53,12 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Service to renew application delegation tokens.
@@ -72,7 +79,12 @@
// delegation token canceler thread
private DelegationTokenCancelThread dtCancelThread =
new DelegationTokenCancelThread();
+ private ExecutorService renewerService;
+ // Only for testing
+ // Used to check how many events are getting processed.
+ private AtomicInteger renewerCount;
+
// managing the list of tokens using Map
// appId=>List
private Set delegationTokens =
@@ -85,8 +97,7 @@
private Thread delayedRemovalThread;
private boolean isServiceStarted = false;
- private List pendingTokenForRenewal =
- new ArrayList();
+ private LinkedBlockingQueue pendingEventQueue;
private boolean tokenKeepAliveEnabled;
@@ -102,6 +113,15 @@ protected synchronized void serviceInit(Configuration conf) throws Exception {
this.tokenRemovalDelayMs =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
+ int nThreads = conf.getInt(
+ YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT);
+ ThreadFactory tf = new ThreadFactoryBuilder()
+ .setNameFormat("DelegationTokenRenewer #%d")
+ .build();
+ renewerService = Executors.newFixedThreadPool(nThreads, tf);
+ pendingEventQueue = new LinkedBlockingQueue();
+ renewerCount = new AtomicInteger(0);
super.serviceInit(conf);
}
@@ -119,21 +139,30 @@ protected void serviceStart() throws Exception {
RMDelegationTokenIdentifier.Renewer.setSecretManager(
rmContext.getRMDelegationTokenSecretManager(),
rmContext.getClientRMService().getBindAddress());
- // Delegation token renewal is delayed until ClientRMService starts. As
- // it is required to short circuit the token renewal calls.
isServiceStarted = true;
- renewIfServiceIsStarted(pendingTokenForRenewal);
- pendingTokenForRenewal.clear();
+ while(!pendingEventQueue.isEmpty()) {
+ processDelegationTokenRewewerEvent(pendingEventQueue.take());
+ }
super.serviceStart();
}
+ private void processDelegationTokenRewewerEvent(
+ DelegationTokenRenewerEvent evt) {
+ if (isServiceStarted) {
+ renewerCount.incrementAndGet();
+ renewerService.execute(new DelegationTokenRenewerThread(evt));
+ } else {
+ pendingEventQueue.add(evt);
+ }
+ }
+
@Override
protected void serviceStop() {
if (renewalTimer != null) {
renewalTimer.cancel();
}
delegationTokens.clear();
-
+ this.renewerService.shutdown();
dtCancelThread.interrupt();
try {
dtCancelThread.join(1000);
@@ -290,47 +319,51 @@ private void addTokenToList(DelegationTokenToRenew t) {
* @throws IOException
*/
public void addApplication(
- ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd)
+ ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd,
+ boolean isApplicationRecovered) {
+ processDelegationTokenRewewerEvent(new DelegationTokenRenewerAppSubmitEvent(
+ applicationId, ts,
+ shouldCancelAtEnd, isApplicationRecovered,
+ DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION));
+ }
+
+ private void addApplication(DelegationTokenRenewerAppSubmitEvent evt)
throws IOException {
+ ApplicationId applicationId = evt.getAppicationId();
+ Credentials ts = evt.getCredentials();
+ boolean shouldCancelAtEnd = evt.shouldCancelAtEnd();
if (ts == null) {
- return; //nothing to add
+ return; // nothing to add
}
-
+
if (LOG.isDebugEnabled()) {
- LOG.debug("Registering tokens for renewal for:" +
+ LOG.debug("Registering tokens for renewal for:" +
" appId = " + applicationId);
}
-
- Collection > tokens = ts.getAllTokens();
+
+ Collection> tokens = ts.getAllTokens();
long now = System.currentTimeMillis();
-
+
// find tokens for renewal, but don't add timers until we know
// all renewable tokens are valid
// At RM restart it is safe to assume that all the previously added tokens
// are valid
List tokenList =
new ArrayList();
- for(Token> token : tokens) {
+ for (Token> token : tokens) {
if (token.isManaged()) {
tokenList.add(new DelegationTokenToRenew(applicationId,
token, getConfig(), now, shouldCancelAtEnd));
}
}
- if (!tokenList.isEmpty()){
- renewIfServiceIsStarted(tokenList);
- }
- }
-
- protected void renewIfServiceIsStarted(List dtrs)
- throws IOException {
- if (isServiceStarted) {
+ if (!tokenList.isEmpty()) {
// Renewing token and adding it to timer calls are separated purposefully
// If user provides incorrect token then it should not be added for
// renewal.
- for (DelegationTokenToRenew dtr : dtrs) {
+ for (DelegationTokenToRenew dtr : tokenList) {
renewToken(dtr);
}
- for (DelegationTokenToRenew dtr : dtrs) {
+ for (DelegationTokenToRenew dtr : tokenList) {
addTokenToList(dtr);
setTimerForTokenRenewal(dtr);
if (LOG.isDebugEnabled()) {
@@ -338,11 +371,9 @@ protected void renewIfServiceIsStarted(List dtrs)
+ dtr.token.getService() + " for appId = " + dtr.applicationId);
}
}
- } else {
- pendingTokenForRenewal.addAll(dtrs);
}
}
-
+
/**
* Task - to renew a token
*
@@ -449,14 +480,20 @@ private void removeFailedDelegationToken(DelegationTokenToRenew t) {
* @param applicationId completed application
*/
public void applicationFinished(ApplicationId applicationId) {
+ processDelegationTokenRewewerEvent(new DelegationTokenRenewerEvent(
+ applicationId,
+ DelegationTokenRenewerEventType.FINISH_APPLICATION));
+ }
+
+ private void applicationFinished(DelegationTokenRenewerEvent evt) {
if (!tokenKeepAliveEnabled) {
- removeApplicationFromRenewal(applicationId);
+ removeApplicationFromRenewal(evt.getAppicationId());
} else {
- delayedRemovalMap.put(applicationId, System.currentTimeMillis()
+ delayedRemovalMap.put(evt.getAppicationId(), System.currentTimeMillis()
+ tokenRemovalDelayMs);
}
}
-
+
/**
* Add a list of applications to the keep alive list. If an appId already
* exists, update it's keep-alive time.
@@ -546,4 +583,63 @@ public void run() {
public void setRMContext(RMContext rmContext) {
this.rmContext = rmContext;
}
+
+ /**
+ * EventHandler implementation which forward events to the
+ * DelegationTokenRenewer. This hides the EventHandler methods of the store
+ * from its public interface
+ */
+ private final class DelegationTokenRenewerThread
+ implements Runnable {
+
+ private DelegationTokenRenewerEvent evt;
+
+ public DelegationTokenRenewerThread(DelegationTokenRenewerEvent evt) {
+ this.evt = evt;
+ }
+
+ @Override
+ public void run() {
+ if (evt.getType().equals(
+ DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION)
+ && evt instanceof DelegationTokenRenewerAppSubmitEvent) {
+ DelegationTokenRenewerAppSubmitEvent appSubmitEvt =
+ (DelegationTokenRenewerAppSubmitEvent) evt;
+ handleDTRenewerEvent(appSubmitEvt);
+ } else if (evt.getType().equals(
+ DelegationTokenRenewerEventType.FINISH_APPLICATION)) {
+ rmContext.getDelegationTokenRenewer().applicationFinished(evt);
+ }
+ rmContext.getDelegationTokenRenewer().renewerCount.decrementAndGet();
+ }
+
+ @SuppressWarnings("unchecked")
+ private void handleDTRenewerEvent(
+ DelegationTokenRenewerAppSubmitEvent event) {
+ try {
+ // Setup tokens for renewal
+ if (UserGroupInformation.isSecurityEnabled()) {
+ rmContext.getDelegationTokenRenewer().addApplication(event);
+ rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppEvent(event.getAppicationId(),
+ event.isApplicationRecovered() ? RMAppEventType.RECOVER
+ : RMAppEventType.START));
+ }
+ } catch (Throwable t) {
+ LOG.warn(
+ "Unable to add the application to the delegation token renewer.",
+ t);
+ // Sending APP_REJECTED is fine, since we assume that the
+ // RMApp is in NEW state and thus we havne't yet informed the
+ // Scheduler about the existence of the application
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppRejectedEvent(event.getAppicationId(), t.getMessage()));
+ }
+ }
+ }
+
+ //Only for Testing
+ public int getInProcessDelegationTokenRenewerEventsCount() {
+ return this.renewerCount.get();
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerAppSubmitEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerAppSubmitEvent.java
new file mode 100644
index 0000000..c3b0116
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerAppSubmitEvent.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.security;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+
+public class DelegationTokenRenewerAppSubmitEvent extends
+ DelegationTokenRenewerEvent {
+
+ private Credentials credentials;
+ private boolean shouldCancelAtEnd;
+ private boolean isAppRecovered;
+
+ public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId,
+ Credentials credentails, boolean shouldCancelAtEnd,
+ boolean isApplicationRecovered, DelegationTokenRenewerEventType type) {
+ super(appId, type);
+ this.credentials = credentails;
+ this.shouldCancelAtEnd = shouldCancelAtEnd;
+ this.isAppRecovered = isApplicationRecovered;
+ }
+
+ public Credentials getCredentials() {
+ return credentials;
+ }
+
+ public boolean shouldCancelAtEnd() {
+ return shouldCancelAtEnd;
+ }
+
+ public boolean isApplicationRecovered() {
+ return isAppRecovered;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerEvent.java
new file mode 100644
index 0000000..947b818
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.security;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class DelegationTokenRenewerEvent extends
+ AbstractEvent {
+
+ private ApplicationId appId;
+
+ public DelegationTokenRenewerEvent(ApplicationId appId,
+ DelegationTokenRenewerEventType type) {
+ super(type);
+ this.appId = appId;
+ }
+
+ public ApplicationId getAppicationId() {
+ return appId;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerEventType.java
new file mode 100644
index 0000000..6074be5
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerEventType.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.security;
+
+public enum DelegationTokenRenewerEventType {
+ VERIFY_AND_START_APPLICATION,
+ FINISH_APPLICATION
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
index 6698412..120be49 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
@@ -172,7 +172,7 @@ public void submitApplication(
ApplicationSubmissionContext submissionContext, String user)
throws YarnException {
super.submitApplication(submissionContext, System.currentTimeMillis(),
- false, user);
+ user);
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index 97f51a2..36dcd38 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -817,6 +817,10 @@ public void testDelegationTokenRestoredInDelegationTokenRenewer()
MockRM rm2 = new TestSecurityMockRM(conf, memStore);
rm2.start();
+ // Need to wait for a while as now token renewal happens on another thread
+ // and is asynchronous in nature.
+ waitForTokensToBeRenewed(rm2);
+
// verify tokens are properly populated back to rm2 DelegationTokenRenewer
Assert.assertEquals(tokenSet, rm2.getRMContext()
.getDelegationTokenRenewer().getDelegationTokens());
@@ -826,6 +830,21 @@ public void testDelegationTokenRestoredInDelegationTokenRenewer()
rm2.stop();
}
+ private void waitForTokensToBeRenewed(MockRM rm2) throws Exception {
+ int waitCnt = 20;
+ boolean atleastOneAppInNEWState = true;
+ while (waitCnt-- > 0 && atleastOneAppInNEWState) {
+ atleastOneAppInNEWState = false;
+ for (RMApp rmApp : rm2.getRMContext().getRMApps().values()) {
+ if (rmApp.getState() == RMAppState.NEW) {
+ Thread.sleep(1000);
+ atleastOneAppInNEWState = true;
+ break;
+ }
+ }
+ }
+ }
+
@Test
public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
index 98e6ab0..66e67a9 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
@@ -21,8 +21,8 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -31,13 +31,21 @@
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -46,16 +54,29 @@
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.security.token.delegation.DelegationKey;
-import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Before;
@@ -66,14 +87,17 @@
/**
* unit test -
- * tests addition/deletion/cancelation of renewals of delegation tokens
+ * tests addition/deletion/cancellation of renewals of delegation tokens
*
*/
+@SuppressWarnings("rawtypes")
public class TestDelegationTokenRenewer {
private static final Log LOG =
LogFactory.getLog(TestDelegationTokenRenewer.class);
private static final Text KIND = new Text("TestDelegationTokenRenewer.Token");
+ private static BlockingQueue eventQueue;
+ private static AsyncDispatcher dispatcher;
public static class Renewer extends TokenRenewer {
private static int counter = 0;
private static Token> lastRenewed = null;
@@ -143,11 +167,19 @@ public static void setUpClass() throws Exception {
@Before
public void setUp() throws Exception {
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ eventQueue = new LinkedBlockingQueue();
+ dispatcher = new AsyncDispatcher(eventQueue);
Renewer.reset();
delegationTokenRenewer = new DelegationTokenRenewer();
delegationTokenRenewer.init(conf);
RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class);
+ when(mockContext.getDelegationTokenRenewer()).thenReturn(
+ delegationTokenRenewer);
+ when(mockContext.getDispatcher()).thenReturn(dispatcher);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
InetSocketAddress sockAddr =
InetSocketAddress.createUnresolved("localhost", 1234);
@@ -316,8 +348,9 @@ public void testDTRenewal () throws Exception {
// register the tokens for renewal
ApplicationId applicationId_0 =
BuilderUtils.newApplicationId(0, 0);
- delegationTokenRenewer.addApplication(applicationId_0, ts, true);
-
+ delegationTokenRenewer.addApplication(applicationId_0, ts, true, false);
+ waitForEventsToGetProcessed(delegationTokenRenewer);
+
// first 3 initial renewals + 1 real
int numberOfExpectedRenewals = 3+1;
@@ -355,9 +388,10 @@ public void testDTRenewal () throws Exception {
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
- delegationTokenRenewer.addApplication(applicationId_1, ts, true);
+ delegationTokenRenewer.addApplication(applicationId_1, ts, true, false);
+ waitForEventsToGetProcessed(delegationTokenRenewer);
delegationTokenRenewer.applicationFinished(applicationId_1);
-
+ waitForEventsToGetProcessed(delegationTokenRenewer);
numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
try {
Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew
@@ -390,12 +424,21 @@ public void testInvalidDTWithAddApplication() throws Exception {
// register the tokens for renewal
ApplicationId appId = BuilderUtils.newApplicationId(0, 0);
- try {
- delegationTokenRenewer.addApplication(appId, ts, true);
- fail("App submission with a cancelled token should have failed");
- } catch (InvalidToken e) {
- // expected
+ delegationTokenRenewer.addApplication(appId, ts, true, false);
+ int waitCnt = 20;
+ while (waitCnt-- >0) {
+ if (!eventQueue.isEmpty()) {
+ Event evt = eventQueue.take();
+ if (evt.getType() == RMAppEventType.APP_REJECTED) {
+ Assert.assertTrue(
+ ((RMAppEvent) evt).getApplicationId().equals(appId));
+ return;
+ }
+ } else {
+ Thread.sleep(500);
+ }
}
+ fail("App submission with a cancelled token should have failed");
}
/**
@@ -425,9 +468,10 @@ public void testDTRenewalWithNoCancel () throws Exception {
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
- delegationTokenRenewer.addApplication(applicationId_1, ts, false);
+ delegationTokenRenewer.addApplication(applicationId_1, ts, false, false);
+ waitForEventsToGetProcessed(delegationTokenRenewer);
delegationTokenRenewer.applicationFinished(applicationId_1);
-
+ waitForEventsToGetProcessed(delegationTokenRenewer);
int numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
try {
Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew
@@ -469,6 +513,9 @@ public void testDTKeepAlive1 () throws Exception {
RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
+ when(mockContext.getDelegationTokenRenewer()).thenReturn(
+ localDtr);
+ when(mockContext.getDispatcher()).thenReturn(dispatcher);
InetSocketAddress sockAddr =
InetSocketAddress.createUnresolved("localhost", 1234);
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
@@ -487,16 +534,25 @@ public void testDTKeepAlive1 () throws Exception {
// register the tokens for renewal
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
- localDtr.addApplication(applicationId_0, ts, true);
+ localDtr.addApplication(applicationId_0, ts, true, false);
+ waitForEventsToGetProcessed(localDtr);
+ if (!eventQueue.isEmpty()){
+ Event evt = eventQueue.take();
+ if (evt instanceof RMAppEvent) {
+ Assert.assertEquals(((RMAppEvent)evt).getType(), RMAppEventType.START);
+ } else {
+ fail("RMAppEvent.START was expected!!");
+ }
+ }
+
localDtr.applicationFinished(applicationId_0);
-
- Thread.sleep(3000l);
+ waitForEventsToGetProcessed(localDtr);
//Token should still be around. Renewal should not fail.
token1.renew(lconf);
//Allow the keepalive time to run out
- Thread.sleep(6000l);
+ Thread.sleep(10000l);
//The token should have been cancelled at this point. Renewal will fail.
try {
@@ -533,6 +589,9 @@ public void testDTKeepAlive2() throws Exception {
RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
+ when(mockContext.getDelegationTokenRenewer()).thenReturn(
+ localDtr);
+ when(mockContext.getDispatcher()).thenReturn(dispatcher);
InetSocketAddress sockAddr =
InetSocketAddress.createUnresolved("localhost", 1234);
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
@@ -551,22 +610,18 @@ public void testDTKeepAlive2() throws Exception {
// register the tokens for renewal
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
- localDtr.addApplication(applicationId_0, ts, true);
+ localDtr.addApplication(applicationId_0, ts, true, false);
localDtr.applicationFinished(applicationId_0);
-
- Thread.sleep(4000l);
-
+ waitForEventsToGetProcessed(delegationTokenRenewer);
//Send another keep alive.
localDtr.updateKeepAliveApplications(Collections
.singletonList(applicationId_0));
//Renewal should not fail.
token1.renew(lconf);
-
//Token should be around after this.
Thread.sleep(4500l);
//Renewal should not fail. - ~1.5 seconds for keepalive timeout.
token1.renew(lconf);
-
//Allow the keepalive time to run out
Thread.sleep(3000l);
//The token should have been cancelled at this point. Renewal will fail.
@@ -575,61 +630,99 @@ public void testDTKeepAlive2() throws Exception {
fail("Renewal of cancelled token should have failed");
} catch (InvalidToken ite) {}
}
+
+
+ private void waitForEventsToGetProcessed(DelegationTokenRenewer dtr)
+ throws InterruptedException {
+ int wait = 20;
+ while (wait-- > 0
+ && dtr.getInProcessDelegationTokenRenewerEventsCount() > 0) {
+ Thread.sleep(200);
+ }
+ }
- @Test(timeout=20000)
- public void testConncurrentAddApplication()
- throws IOException, InterruptedException, BrokenBarrierException {
- final CyclicBarrier startBarrier = new CyclicBarrier(2);
- final CyclicBarrier endBarrier = new CyclicBarrier(2);
-
- // this token uses barriers to block during renew
- final Credentials creds1 = new Credentials();
- final Token> token1 = mock(Token.class);
- creds1.addToken(new Text("token"), token1);
- doReturn(true).when(token1).isManaged();
- doAnswer(new Answer() {
- public Long answer(InvocationOnMock invocation)
- throws InterruptedException, BrokenBarrierException {
- startBarrier.await();
- endBarrier.await();
- return Long.MAX_VALUE;
- }}).when(token1).renew(any(Configuration.class));
-
- // this dummy token fakes renewing
- final Credentials creds2 = new Credentials();
- final Token> token2 = mock(Token.class);
- creds2.addToken(new Text("token"), token2);
- doReturn(true).when(token2).isManaged();
- doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class));
-
- // fire up the renewer
- final DelegationTokenRenewer dtr = new DelegationTokenRenewer();
- dtr.init(conf);
- RMContext mockContext = mock(RMContext.class);
- ClientRMService mockClientRMService = mock(ClientRMService.class);
- when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
- InetSocketAddress sockAddr =
- InetSocketAddress.createUnresolved("localhost", 1234);
- when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
- dtr.setRMContext(mockContext);
- dtr.start();
-
- // submit a job that blocks during renewal
- Thread submitThread = new Thread() {
- @Override
+ @Test(timeout=20000)
+ public void testConcurrentAddApplication()
+ throws IOException, InterruptedException, BrokenBarrierException {
+ final CyclicBarrier startBarrier = new CyclicBarrier(2);
+ final CyclicBarrier endBarrier = new CyclicBarrier(2);
+
+ // this token uses barriers to block during renew
+ final Credentials creds1 = new Credentials();
+ final Token> token1 = mock(Token.class);
+ creds1.addToken(new Text("token"), token1);
+ doReturn(true).when(token1).isManaged();
+ doAnswer(new Answer() {
+ public Long answer(InvocationOnMock invocation)
+ throws InterruptedException, BrokenBarrierException {
+ startBarrier.await();
+ endBarrier.await();
+ return Long.MAX_VALUE;
+ }}).when(token1).renew(any(Configuration.class));
+
+ // this dummy token fakes renewing
+ final Credentials creds2 = new Credentials();
+ final Token> token2 = mock(Token.class);
+ creds2.addToken(new Text("token"), token2);
+ doReturn(true).when(token2).isManaged();
+ doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class));
+
+ // fire up the renewer
+ final DelegationTokenRenewer dtr = new DelegationTokenRenewer();
+ dtr.init(conf);
+ RMContext mockContext = mock(RMContext.class);
+ ClientRMService mockClientRMService = mock(ClientRMService.class);
+ when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
+ InetSocketAddress sockAddr =
+ InetSocketAddress.createUnresolved("localhost", 1234);
+ when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
+ dtr.setRMContext(mockContext);
+ when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr);
+ dtr.start();
+ // submit a job that blocks during renewal
+ Thread submitThread = new Thread() {
+ @Override
public void run() {
- try {
- dtr.addApplication(mock(ApplicationId.class), creds1, false);
- } catch (IOException e) {}
- }
- };
- submitThread.start();
-
+ dtr.addApplication(mock(ApplicationId.class), creds1, false, false);
+ }
+ };
+ submitThread.start();
+
// wait till 1st submit blocks, then submit another
- startBarrier.await();
- dtr.addApplication(mock(ApplicationId.class), creds2, false);
- // signal 1st to complete
- endBarrier.await();
- submitThread.join();
+ startBarrier.await();
+ dtr.addApplication(mock(ApplicationId.class), creds2, false, false);
+ // signal 1st to complete
+ endBarrier.await();
+ submitThread.join();
+ }
+
+ @Test
+ public void testInvalidDelegationTokenApplicationSubmit() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ MockRM rm = new MockRM(conf);
+ ByteBuffer tokens = ByteBuffer.wrap("BOGUS".getBytes());
+ ContainerLaunchContext amContainer =
+ ContainerLaunchContext.newInstance(
+ new HashMap(), new HashMap(),
+ new ArrayList(), new HashMap(), tokens,
+ new HashMap());
+ ApplicationSubmissionContext appSubContext =
+ ApplicationSubmissionContext.newInstance(
+ ApplicationId.newInstance(1234121, 0),
+ "BOGUS", "default", Priority.UNDEFINED, amContainer, false,
+ true, 1, Resource.newInstance(1024, 1), "BOGUS");
+ SubmitApplicationRequest request =
+ SubmitApplicationRequest.newInstance(appSubContext);
+ try {
+ rm.getClientRMService().submitApplication(request);
+ fail("Error was excepted.");
+ } catch (YarnException e) {
+ Assert.assertTrue(e.getMessage().contains(
+ "Bad header found in token storage"));
+ }
}
}