diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 09f6b6e..078f827 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -489,6 +489,11 @@ RM_PREFIX + "delayed.delegation-token.removal-interval-ms"; public static final long DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS = 30000l; + + /** Delegation Token renewer thread count */ + public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = + RM_PREFIX + "delegation-token-renewer.thread-count"; + public static final int DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = 5; /** Whether to enable log aggregation */ public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 90c88c2..e1cfc29 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -317,7 +317,7 @@ public SubmitApplicationResponse submitApplication( try { // call RMAppManager to submit application directly rmAppManager.submitApplication(submissionContext, - System.currentTimeMillis(), false, user); + System.currentTimeMillis(), user, false, null); LOG.info("Application with id " + applicationId.getId() + " submitted by user " + user); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 55b748d..b5961c4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; @@ -229,35 +230,63 @@ protected synchronized void checkAppNumCompletedLimit() { this.applicationACLsManager.removeApplication(removeId); } } - + @SuppressWarnings("unchecked") protected void submitApplication( ApplicationSubmissionContext submissionContext, long submitTime, - boolean isRecovered, String user) throws YarnException { + String user, boolean isRecovered, RMState state) throws YarnException { ApplicationId applicationId = submissionContext.getApplicationId(); - // Validation of the ApplicationSubmissionContext needs to be completed - // here. Only those fields that are dependent on RM's configuration are - // checked here as they have to be validated whether they are part of new - // submission or just being recovered. + RMAppImpl application = + createAndPopulateNewRMApp(submissionContext, submitTime, user); - // Check whether AM resource requirements are within required limits - if (!submissionContext.getUnmanagedAM()) { - ResourceRequest amReq = BuilderUtils.newResourceRequest( - RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, - submissionContext.getResource(), 1); + if (isRecovered) { + recoverApplication(state, application); + RMAppState rmAppState = + state.getApplicationState().get(applicationId).getState(); + if (isApplicationInFinalState(rmAppState)) { + // synchronously move the application into final state as if we do it + // asynchronously then momentarily client may see the earlier finished + //application in NEW state. + application + .handle(new RMAppEvent(applicationId, RMAppEventType.RECOVER)); + return; + } + } + + if (UserGroupInformation.isSecurityEnabled()) { + Credentials credentials = null; try { - SchedulerUtils.validateResourceRequest(amReq, - scheduler.getMaximumResourceCapability()); - } catch (InvalidResourceRequestException e) { - LOG.warn("RM app submission failed in validating AM resource request" - + " for application " + applicationId, e); - throw e; + credentials = parseCredentials(submissionContext); + } catch (Exception e) { + LOG.warn( + "Unable to parse credentials.", e); + // Sending APP_REJECTED is fine, since we assume that the + // RMApp is in NEW state and thus we haven't yet informed the + // scheduler about the existence of the application + assert application.getState() == RMAppState.NEW; + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppRejectedEvent(applicationId, e.getMessage())); + throw RPCUtil.getRemoteException(e); } + this.rmContext.getDelegationTokenRenewer().addApplication( + applicationId, credentials, + submissionContext.getCancelTokensWhenComplete(), isRecovered); + } else { + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppEvent(applicationId, + isRecovered ? RMAppEventType.RECOVER : RMAppEventType.START)); } + } + private RMAppImpl createAndPopulateNewRMApp( + ApplicationSubmissionContext submissionContext, + long submitTime, String user) + throws YarnException { + ApplicationId applicationId = submissionContext.getApplicationId(); + validateResourceRequest(submissionContext); // Create RMApp - RMApp application = + RMAppImpl application = new RMAppImpl(applicationId, rmContext, this.conf, submissionContext.getApplicationName(), user, submissionContext.getQueue(), @@ -274,35 +303,52 @@ protected void submitApplication( LOG.warn(message); throw RPCUtil.getRemoteException(message); } - // Inform the ACLs Manager this.applicationACLsManager.addApplication(applicationId, submissionContext.getAMContainerSpec().getApplicationACLs()); + return application; + } - try { - // Setup tokens for renewal - if (UserGroupInformation.isSecurityEnabled()) { - this.rmContext.getDelegationTokenRenewer().addApplication( - applicationId,parseCredentials(submissionContext), - submissionContext.getCancelTokensWhenComplete() - ); + private void validateResourceRequest( + ApplicationSubmissionContext submissionContext) + throws InvalidResourceRequestException { + // Validation of the ApplicationSubmissionContext needs to be completed + // here. Only those fields that are dependent on RM's configuration are + // checked here as they have to be validated whether they are part of new + // submission or just being recovered. + + // Check whether AM resource requirements are within required limits + if (!submissionContext.getUnmanagedAM()) { + ResourceRequest amReq = BuilderUtils.newResourceRequest( + RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, + submissionContext.getResource(), 1); + try { + SchedulerUtils.validateResourceRequest(amReq, + scheduler.getMaximumResourceCapability()); + } catch (InvalidResourceRequestException e) { + LOG.warn("RM app submission failed in validating AM resource request" + + " for application " + submissionContext.getApplicationId(), e); + throw e; } - } catch (IOException ie) { - LOG.warn( - "Unable to add the application to the delegation token renewer.", - ie); - // Sending APP_REJECTED is fine, since we assume that the - // RMApp is in NEW state and thus we havne't yet informed the - // Scheduler about the existence of the application - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppRejectedEvent(applicationId, ie.getMessage())); - throw RPCUtil.getRemoteException(ie); } + } - if (!isRecovered) { - // All done, start the RMApp - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(applicationId, RMAppEventType.START)); + private void recoverApplication(RMState state, RMAppImpl application) + throws YarnException { + try { + application.recover(state); + } catch (Exception e) { + LOG.error("Error recovering application", e); + throw new YarnException(e); + } + } + + private boolean isApplicationInFinalState(RMAppState rmAppState) { + if (rmAppState == RMAppState.FINISHED || rmAppState == RMAppState.FAILED + || rmAppState == RMAppState.KILLED) { + return true; + } else { + return false; } } @@ -328,17 +374,9 @@ public void recover(RMState state) throws Exception { LOG.info("Recovering " + appStates.size() + " applications"); for (ApplicationState appState : appStates.values()) { LOG.info("Recovering application " + appState.getAppId()); + submitApplication(appState.getApplicationSubmissionContext(), - appState.getSubmitTime(), true, appState.getUser()); - // re-populate attempt information in application - RMAppImpl appImpl = - (RMAppImpl) rmContext.getRMApps().get(appState.getAppId()); - appImpl.recover(state); - // Recover the app synchronously, as otherwise client is possible to see - // the application not recovered before it is actually recovered because - // ClientRMService is already started at this point of time. - appImpl.handle(new RMAppEvent(appImpl.getApplicationId(), - RMAppEventType.RECOVER)); + appState.getSubmitTime(), appState.getUser(), true, state); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index a58b917..d778c64 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -33,8 +33,11 @@ import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -50,8 +53,12 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * Service to renew application delegation tokens. @@ -72,7 +79,12 @@ // delegation token canceler thread private DelegationTokenCancelThread dtCancelThread = new DelegationTokenCancelThread(); + private ExecutorService renewerService; + // Only for testing + // Used to check how many events are pending. + private AtomicInteger renewerCount; + // managing the list of tokens using Map // appId=>List private Set delegationTokens = @@ -85,8 +97,7 @@ private Thread delayedRemovalThread; private boolean isServiceStarted = false; - private List pendingTokenForRenewal = - new ArrayList(); + private LinkedBlockingQueue pendingEventQueue; private boolean tokenKeepAliveEnabled; @@ -102,6 +113,15 @@ protected synchronized void serviceInit(Configuration conf) throws Exception { this.tokenRemovalDelayMs = conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); + int nThreads = conf.getInt( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT, + YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT); + ThreadFactory tf = new ThreadFactoryBuilder() + .setNameFormat("DelegationTokenRenewer #%d") + .build(); + renewerService = Executors.newFixedThreadPool(nThreads, tf); + pendingEventQueue = new LinkedBlockingQueue(); + renewerCount = new AtomicInteger(0); super.serviceInit(conf); } @@ -119,21 +139,30 @@ protected void serviceStart() throws Exception { RMDelegationTokenIdentifier.Renewer.setSecretManager( rmContext.getRMDelegationTokenSecretManager(), rmContext.getClientRMService().getBindAddress()); - // Delegation token renewal is delayed until ClientRMService starts. As - // it is required to short circuit the token renewal calls. isServiceStarted = true; - renewIfServiceIsStarted(pendingTokenForRenewal); - pendingTokenForRenewal.clear(); + while(!pendingEventQueue.isEmpty()) { + processDelegationTokenRewewerEvent(pendingEventQueue.take()); + } super.serviceStart(); } + private void processDelegationTokenRewewerEvent( + DelegationTokenRenewerEvent evt) { + if (isServiceStarted) { + renewerCount.incrementAndGet(); + renewerService.execute(new DelegationTokenRenewerRunnable(evt)); + } else { + pendingEventQueue.add(evt); + } + } + @Override protected void serviceStop() { if (renewalTimer != null) { renewalTimer.cancel(); } delegationTokens.clear(); - + this.renewerService.shutdown(); dtCancelThread.interrupt(); try { dtCancelThread.join(1000); @@ -290,47 +319,50 @@ private void addTokenToList(DelegationTokenToRenew t) { * @throws IOException */ public void addApplication( - ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd) + ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd, + boolean isApplicationRecovered) { + processDelegationTokenRewewerEvent(new DelegationTokenRenewerAppSubmitEvent( + applicationId, ts, + shouldCancelAtEnd, isApplicationRecovered)); + } + + private void addApplication(DelegationTokenRenewerAppSubmitEvent evt) throws IOException { + ApplicationId applicationId = evt.getApplicationId(); + Credentials ts = evt.getCredentials(); + boolean shouldCancelAtEnd = evt.shouldCancelAtEnd(); if (ts == null) { - return; //nothing to add + return; // nothing to add } - + if (LOG.isDebugEnabled()) { - LOG.debug("Registering tokens for renewal for:" + + LOG.debug("Registering tokens for renewal for:" + " appId = " + applicationId); } - - Collection > tokens = ts.getAllTokens(); + + Collection> tokens = ts.getAllTokens(); long now = System.currentTimeMillis(); - + // find tokens for renewal, but don't add timers until we know // all renewable tokens are valid // At RM restart it is safe to assume that all the previously added tokens // are valid List tokenList = new ArrayList(); - for(Token token : tokens) { + for (Token token : tokens) { if (token.isManaged()) { tokenList.add(new DelegationTokenToRenew(applicationId, token, getConfig(), now, shouldCancelAtEnd)); } } - if (!tokenList.isEmpty()){ - renewIfServiceIsStarted(tokenList); - } - } - - protected void renewIfServiceIsStarted(List dtrs) - throws IOException { - if (isServiceStarted) { + if (!tokenList.isEmpty()) { // Renewing token and adding it to timer calls are separated purposefully // If user provides incorrect token then it should not be added for // renewal. - for (DelegationTokenToRenew dtr : dtrs) { + for (DelegationTokenToRenew dtr : tokenList) { renewToken(dtr); } - for (DelegationTokenToRenew dtr : dtrs) { + for (DelegationTokenToRenew dtr : tokenList) { addTokenToList(dtr); setTimerForTokenRenewal(dtr); if (LOG.isDebugEnabled()) { @@ -338,11 +370,9 @@ protected void renewIfServiceIsStarted(List dtrs) + dtr.token.getService() + " for appId = " + dtr.applicationId); } } - } else { - pendingTokenForRenewal.addAll(dtrs); } } - + /** * Task - to renew a token * @@ -449,14 +479,20 @@ private void removeFailedDelegationToken(DelegationTokenToRenew t) { * @param applicationId completed application */ public void applicationFinished(ApplicationId applicationId) { + processDelegationTokenRewewerEvent(new DelegationTokenRenewerEvent( + applicationId, + DelegationTokenRenewerEventType.FINISH_APPLICATION)); + } + + private void applicationFinished(DelegationTokenRenewerEvent evt) { if (!tokenKeepAliveEnabled) { - removeApplicationFromRenewal(applicationId); + removeApplicationFromRenewal(evt.getApplicationId()); } else { - delayedRemovalMap.put(applicationId, System.currentTimeMillis() + delayedRemovalMap.put(evt.getApplicationId(), System.currentTimeMillis() + tokenRemovalDelayMs); } } - + /** * Add a list of applications to the keep alive list. If an appId already * exists, update it's keep-alive time. @@ -546,4 +582,61 @@ public void run() { public void setRMContext(RMContext rmContext) { this.rmContext = rmContext; } + + /* + * This will run as a separate thread and will process individual events. It + * is done in this way to make sure that the token renewal as a part of + * application submission and token removal as a part of application finish + * is asynchronous in nature. + */ + private final class DelegationTokenRenewerRunnable + implements Runnable { + + private DelegationTokenRenewerEvent evt; + + public DelegationTokenRenewerRunnable(DelegationTokenRenewerEvent evt) { + this.evt = evt; + } + + @Override + public void run() { + if (evt instanceof DelegationTokenRenewerAppSubmitEvent) { + DelegationTokenRenewerAppSubmitEvent appSubmitEvt = + (DelegationTokenRenewerAppSubmitEvent) evt; + handleDTRenewerEvent(appSubmitEvt); + } else if (evt.getType().equals( + DelegationTokenRenewerEventType.FINISH_APPLICATION)) { + DelegationTokenRenewer.this.applicationFinished(evt); + } + DelegationTokenRenewer.this.renewerCount.decrementAndGet(); + } + + @SuppressWarnings("unchecked") + private void handleDTRenewerEvent( + DelegationTokenRenewerAppSubmitEvent event) { + try { + // Setup tokens for renewal + DelegationTokenRenewer.this.addApplication(event); + rmContext.getDispatcher().getEventHandler() + .handle(new RMAppEvent(event.getApplicationId(), + event.isApplicationRecovered() ? RMAppEventType.RECOVER + : RMAppEventType.START)); + } catch (Throwable t) { + LOG.warn( + "Unable to add the application to the delegation token renewer.", + t); + // Sending APP_REJECTED is fine, since we assume that the + // RMApp is in NEW state and thus we havne't yet informed the + // Scheduler about the existence of the application + rmContext.getDispatcher().getEventHandler().handle( + new RMAppRejectedEvent(event.getApplicationId(), t.getMessage())); + } + } + } + + @Private + @VisibleForTesting + public int getInProcessDelegationTokenRenewerEventsCount() { + return this.renewerCount.get(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerAppSubmitEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerAppSubmitEvent.java new file mode 100644 index 0000000..91ebef9 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerAppSubmitEvent.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.security; + +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationId; + + +public class DelegationTokenRenewerAppSubmitEvent extends + DelegationTokenRenewerEvent { + + private Credentials credentials; + private boolean shouldCancelAtEnd; + private boolean isAppRecovered; + + public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId, + Credentials credentails, boolean shouldCancelAtEnd, + boolean isApplicationRecovered) { + super(appId, DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION); + this.credentials = credentails; + this.shouldCancelAtEnd = shouldCancelAtEnd; + this.isAppRecovered = isApplicationRecovered; + } + + public Credentials getCredentials() { + return credentials; + } + + public boolean shouldCancelAtEnd() { + return shouldCancelAtEnd; + } + + public boolean isApplicationRecovered() { + return isAppRecovered; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerEvent.java new file mode 100644 index 0000000..3964f7a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerEvent.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.security; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.event.AbstractEvent; + +public class DelegationTokenRenewerEvent extends + AbstractEvent { + + private ApplicationId appId; + + public DelegationTokenRenewerEvent(ApplicationId appId, + DelegationTokenRenewerEventType type) { + super(type); + this.appId = appId; + } + + public ApplicationId getApplicationId() { + return appId; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerEventType.java new file mode 100644 index 0000000..6074be5 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewerEventType.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.security; + +public enum DelegationTokenRenewerEventType { + VERIFY_AND_START_APPLICATION, + FINISH_APPLICATION +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index 6698412..8fe8de4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -172,7 +172,7 @@ public void submitApplication( ApplicationSubmissionContext submissionContext, String user) throws YarnException { super.submitApplication(submissionContext, System.currentTimeMillis(), - false, user); + user, false, null); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 97f51a2..36dcd38 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -817,6 +817,10 @@ public void testDelegationTokenRestoredInDelegationTokenRenewer() MockRM rm2 = new TestSecurityMockRM(conf, memStore); rm2.start(); + // Need to wait for a while as now token renewal happens on another thread + // and is asynchronous in nature. + waitForTokensToBeRenewed(rm2); + // verify tokens are properly populated back to rm2 DelegationTokenRenewer Assert.assertEquals(tokenSet, rm2.getRMContext() .getDelegationTokenRenewer().getDelegationTokens()); @@ -826,6 +830,21 @@ public void testDelegationTokenRestoredInDelegationTokenRenewer() rm2.stop(); } + private void waitForTokensToBeRenewed(MockRM rm2) throws Exception { + int waitCnt = 20; + boolean atleastOneAppInNEWState = true; + while (waitCnt-- > 0 && atleastOneAppInNEWState) { + atleastOneAppInNEWState = false; + for (RMApp rmApp : rm2.getRMContext().getRMApps().values()) { + if (rmApp.getState() == RMAppState.NEW) { + Thread.sleep(1000); + atleastOneAppInNEWState = true; + break; + } + } + } + } + @Test public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 98e6ab0..b829d23 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -21,8 +21,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -31,13 +31,21 @@ import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.LinkedBlockingQueue; + +import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -46,16 +54,29 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.security.token.delegation.DelegationKey; -import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.After; import org.junit.Before; @@ -66,14 +87,17 @@ /** * unit test - - * tests addition/deletion/cancelation of renewals of delegation tokens + * tests addition/deletion/cancellation of renewals of delegation tokens * */ +@SuppressWarnings("rawtypes") public class TestDelegationTokenRenewer { private static final Log LOG = LogFactory.getLog(TestDelegationTokenRenewer.class); private static final Text KIND = new Text("TestDelegationTokenRenewer.Token"); + private static BlockingQueue eventQueue; + private static AsyncDispatcher dispatcher; public static class Renewer extends TokenRenewer { private static int counter = 0; private static Token lastRenewed = null; @@ -143,11 +167,19 @@ public static void setUpClass() throws Exception { @Before public void setUp() throws Exception { + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + eventQueue = new LinkedBlockingQueue(); + dispatcher = new AsyncDispatcher(eventQueue); Renewer.reset(); delegationTokenRenewer = new DelegationTokenRenewer(); delegationTokenRenewer.init(conf); RMContext mockContext = mock(RMContext.class); ClientRMService mockClientRMService = mock(ClientRMService.class); + when(mockContext.getDelegationTokenRenewer()).thenReturn( + delegationTokenRenewer); + when(mockContext.getDispatcher()).thenReturn(dispatcher); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); InetSocketAddress sockAddr = InetSocketAddress.createUnresolved("localhost", 1234); @@ -285,7 +317,7 @@ static MyToken createTokens(Text renewer) * @throws IOException * @throws URISyntaxException */ - @Test + @Test(timeout=60000) public void testDTRenewal () throws Exception { MyFS dfs = (MyFS)FileSystem.get(conf); LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode()); @@ -316,8 +348,9 @@ public void testDTRenewal () throws Exception { // register the tokens for renewal ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0); - delegationTokenRenewer.addApplication(applicationId_0, ts, true); - + delegationTokenRenewer.addApplication(applicationId_0, ts, true, false); + waitForEventsToGetProcessed(delegationTokenRenewer); + // first 3 initial renewals + 1 real int numberOfExpectedRenewals = 3+1; @@ -355,9 +388,10 @@ public void testDTRenewal () throws Exception { ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1); - delegationTokenRenewer.addApplication(applicationId_1, ts, true); + delegationTokenRenewer.addApplication(applicationId_1, ts, true, false); + waitForEventsToGetProcessed(delegationTokenRenewer); delegationTokenRenewer.applicationFinished(applicationId_1); - + waitForEventsToGetProcessed(delegationTokenRenewer); numberOfExpectedRenewals = Renewer.counter; // number of renewals so far try { Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew @@ -377,7 +411,7 @@ public void testDTRenewal () throws Exception { } } - @Test + @Test(timeout=60000) public void testInvalidDTWithAddApplication() throws Exception { MyFS dfs = (MyFS)FileSystem.get(conf); LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode()); @@ -390,12 +424,21 @@ public void testInvalidDTWithAddApplication() throws Exception { // register the tokens for renewal ApplicationId appId = BuilderUtils.newApplicationId(0, 0); - try { - delegationTokenRenewer.addApplication(appId, ts, true); - fail("App submission with a cancelled token should have failed"); - } catch (InvalidToken e) { - // expected + delegationTokenRenewer.addApplication(appId, ts, true, false); + int waitCnt = 20; + while (waitCnt-- >0) { + if (!eventQueue.isEmpty()) { + Event evt = eventQueue.take(); + if (evt.getType() == RMAppEventType.APP_REJECTED) { + Assert.assertTrue( + ((RMAppEvent) evt).getApplicationId().equals(appId)); + return; + } + } else { + Thread.sleep(500); + } } + fail("App submission with a cancelled token should have failed"); } /** @@ -408,7 +451,7 @@ public void testInvalidDTWithAddApplication() throws Exception { * @throws IOException * @throws URISyntaxException */ - @Test + @Test(timeout=60000) public void testDTRenewalWithNoCancel () throws Exception { MyFS dfs = (MyFS)FileSystem.get(conf); LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode()); @@ -425,9 +468,10 @@ public void testDTRenewalWithNoCancel () throws Exception { ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1); - delegationTokenRenewer.addApplication(applicationId_1, ts, false); + delegationTokenRenewer.addApplication(applicationId_1, ts, false, false); + waitForEventsToGetProcessed(delegationTokenRenewer); delegationTokenRenewer.applicationFinished(applicationId_1); - + waitForEventsToGetProcessed(delegationTokenRenewer); int numberOfExpectedRenewals = Renewer.counter; // number of renewals so far try { Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew @@ -454,7 +498,7 @@ public void testDTRenewalWithNoCancel () throws Exception { * @throws IOException * @throws URISyntaxException */ - @Test + @Test(timeout=60000) public void testDTKeepAlive1 () throws Exception { DelegationTokenRenewer localDtr = new DelegationTokenRenewer(); Configuration lconf = new Configuration(conf); @@ -469,6 +513,9 @@ public void testDTKeepAlive1 () throws Exception { RMContext mockContext = mock(RMContext.class); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); + when(mockContext.getDelegationTokenRenewer()).thenReturn( + localDtr); + when(mockContext.getDispatcher()).thenReturn(dispatcher); InetSocketAddress sockAddr = InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); @@ -487,16 +534,25 @@ public void testDTKeepAlive1 () throws Exception { // register the tokens for renewal ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0); - localDtr.addApplication(applicationId_0, ts, true); + localDtr.addApplication(applicationId_0, ts, true, false); + waitForEventsToGetProcessed(localDtr); + if (!eventQueue.isEmpty()){ + Event evt = eventQueue.take(); + if (evt instanceof RMAppEvent) { + Assert.assertEquals(((RMAppEvent)evt).getType(), RMAppEventType.START); + } else { + fail("RMAppEvent.START was expected!!"); + } + } + localDtr.applicationFinished(applicationId_0); - - Thread.sleep(3000l); + waitForEventsToGetProcessed(localDtr); //Token should still be around. Renewal should not fail. token1.renew(lconf); //Allow the keepalive time to run out - Thread.sleep(6000l); + Thread.sleep(10000l); //The token should have been cancelled at this point. Renewal will fail. try { @@ -518,7 +574,7 @@ public void testDTKeepAlive1 () throws Exception { * @throws IOException * @throws URISyntaxException */ - @Test + @Test(timeout=60000) public void testDTKeepAlive2() throws Exception { DelegationTokenRenewer localDtr = new DelegationTokenRenewer(); Configuration lconf = new Configuration(conf); @@ -533,6 +589,9 @@ public void testDTKeepAlive2() throws Exception { RMContext mockContext = mock(RMContext.class); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); + when(mockContext.getDelegationTokenRenewer()).thenReturn( + localDtr); + when(mockContext.getDispatcher()).thenReturn(dispatcher); InetSocketAddress sockAddr = InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); @@ -551,22 +610,18 @@ public void testDTKeepAlive2() throws Exception { // register the tokens for renewal ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0); - localDtr.addApplication(applicationId_0, ts, true); + localDtr.addApplication(applicationId_0, ts, true, false); localDtr.applicationFinished(applicationId_0); - - Thread.sleep(4000l); - + waitForEventsToGetProcessed(delegationTokenRenewer); //Send another keep alive. localDtr.updateKeepAliveApplications(Collections .singletonList(applicationId_0)); //Renewal should not fail. token1.renew(lconf); - //Token should be around after this. Thread.sleep(4500l); //Renewal should not fail. - ~1.5 seconds for keepalive timeout. token1.renew(lconf); - //Allow the keepalive time to run out Thread.sleep(3000l); //The token should have been cancelled at this point. Renewal will fail. @@ -575,61 +630,99 @@ public void testDTKeepAlive2() throws Exception { fail("Renewal of cancelled token should have failed"); } catch (InvalidToken ite) {} } + + + private void waitForEventsToGetProcessed(DelegationTokenRenewer dtr) + throws InterruptedException { + int wait = 20; + while (wait-- > 0 + && dtr.getInProcessDelegationTokenRenewerEventsCount() > 0) { + Thread.sleep(200); + } + } - @Test(timeout=20000) - public void testConncurrentAddApplication() - throws IOException, InterruptedException, BrokenBarrierException { - final CyclicBarrier startBarrier = new CyclicBarrier(2); - final CyclicBarrier endBarrier = new CyclicBarrier(2); - - // this token uses barriers to block during renew - final Credentials creds1 = new Credentials(); - final Token token1 = mock(Token.class); - creds1.addToken(new Text("token"), token1); - doReturn(true).when(token1).isManaged(); - doAnswer(new Answer() { - public Long answer(InvocationOnMock invocation) - throws InterruptedException, BrokenBarrierException { - startBarrier.await(); - endBarrier.await(); - return Long.MAX_VALUE; - }}).when(token1).renew(any(Configuration.class)); - - // this dummy token fakes renewing - final Credentials creds2 = new Credentials(); - final Token token2 = mock(Token.class); - creds2.addToken(new Text("token"), token2); - doReturn(true).when(token2).isManaged(); - doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class)); - - // fire up the renewer - final DelegationTokenRenewer dtr = new DelegationTokenRenewer(); - dtr.init(conf); - RMContext mockContext = mock(RMContext.class); - ClientRMService mockClientRMService = mock(ClientRMService.class); - when(mockContext.getClientRMService()).thenReturn(mockClientRMService); - InetSocketAddress sockAddr = - InetSocketAddress.createUnresolved("localhost", 1234); - when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); - dtr.setRMContext(mockContext); - dtr.start(); - - // submit a job that blocks during renewal - Thread submitThread = new Thread() { - @Override + @Test(timeout=20000) + public void testConcurrentAddApplication() + throws IOException, InterruptedException, BrokenBarrierException { + final CyclicBarrier startBarrier = new CyclicBarrier(2); + final CyclicBarrier endBarrier = new CyclicBarrier(2); + + // this token uses barriers to block during renew + final Credentials creds1 = new Credentials(); + final Token token1 = mock(Token.class); + creds1.addToken(new Text("token"), token1); + doReturn(true).when(token1).isManaged(); + doAnswer(new Answer() { + public Long answer(InvocationOnMock invocation) + throws InterruptedException, BrokenBarrierException { + startBarrier.await(); + endBarrier.await(); + return Long.MAX_VALUE; + }}).when(token1).renew(any(Configuration.class)); + + // this dummy token fakes renewing + final Credentials creds2 = new Credentials(); + final Token token2 = mock(Token.class); + creds2.addToken(new Text("token"), token2); + doReturn(true).when(token2).isManaged(); + doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class)); + + // fire up the renewer + final DelegationTokenRenewer dtr = new DelegationTokenRenewer(); + dtr.init(conf); + RMContext mockContext = mock(RMContext.class); + ClientRMService mockClientRMService = mock(ClientRMService.class); + when(mockContext.getClientRMService()).thenReturn(mockClientRMService); + InetSocketAddress sockAddr = + InetSocketAddress.createUnresolved("localhost", 1234); + when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); + dtr.setRMContext(mockContext); + when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr); + dtr.start(); + // submit a job that blocks during renewal + Thread submitThread = new Thread() { + @Override public void run() { - try { - dtr.addApplication(mock(ApplicationId.class), creds1, false); - } catch (IOException e) {} - } - }; - submitThread.start(); - + dtr.addApplication(mock(ApplicationId.class), creds1, false, false); + } + }; + submitThread.start(); + // wait till 1st submit blocks, then submit another - startBarrier.await(); - dtr.addApplication(mock(ApplicationId.class), creds2, false); - // signal 1st to complete - endBarrier.await(); - submitThread.join(); + startBarrier.await(); + dtr.addApplication(mock(ApplicationId.class), creds2, false, false); + // signal 1st to complete + endBarrier.await(); + submitThread.join(); + } + + @Test(timeout=20000) + public void testInvalidDelegationTokenApplicationSubmit() throws Exception { + Configuration conf = new Configuration(); + conf.set( + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + MockRM rm = new MockRM(conf); + ByteBuffer tokens = ByteBuffer.wrap("BOGUS".getBytes()); + ContainerLaunchContext amContainer = + ContainerLaunchContext.newInstance( + new HashMap(), new HashMap(), + new ArrayList(), new HashMap(), tokens, + new HashMap()); + ApplicationSubmissionContext appSubContext = + ApplicationSubmissionContext.newInstance( + ApplicationId.newInstance(1234121, 0), + "BOGUS", "default", Priority.UNDEFINED, amContainer, false, + true, 1, Resource.newInstance(1024, 1), "BOGUS"); + SubmitApplicationRequest request = + SubmitApplicationRequest.newInstance(appSubContext); + try { + rm.getClientRMService().submitApplication(request); + fail("Error was excepted."); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().contains( + "Bad header found in token storage")); + } } }