diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 7855042..09a9023 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer.DelegationTokenRenewerAppSubmitEvent; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -287,21 +288,35 @@ protected void submitApplication( Credentials credentials = null; try { credentials = parseCredentials(submissionContext); + DelegationTokenRenewerAppSubmitEvent submitEvent = + new DelegationTokenRenewerAppSubmitEvent(applicationId, + credentials, submissionContext.getCancelTokensWhenComplete()); + + if (isRecovered) { + // synchronously renew delegation token on recovery. + rmContext.getDelegationTokenRenewer().handleAppSubmitEvent( + submitEvent); + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppEvent(applicationId, RMAppEventType.RECOVER)); + } else { + this.rmContext.getDelegationTokenRenewer() + .processDelegationTokenRenewerEvent(submitEvent); + } } catch (Exception e) { - LOG.warn( - "Unable to parse credentials.", e); + LOG.warn("Unable to parse credentials or add the application to the" + + " delegation token renewer.", e); // Sending APP_REJECTED is fine, since we assume that the // RMApp is in NEW state and thus we haven't yet informed the // scheduler about the existence of the application assert application.getState() == RMAppState.NEW; - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppRejectedEvent(applicationId, e.getMessage())); + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppRejectedEvent(applicationId, e.getMessage())); throw RPCUtil.getRemoteException(e); } - this.rmContext.getDelegationTokenRenewer().addApplication( - applicationId, credentials, - submissionContext.getCancelTokensWhenComplete(), isRecovered); } else { + // Dispatcher is not yet started at this time, so these RECOVER events + // enqueued should be guaranteed to be first processed when dispatcher + // gets started. this.rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, isRecovered ? RMAppEventType.RECOVER : RMAppEventType.START)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 8c9c52f..3bcc97a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -731,7 +731,9 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { * Therefore we should wait for it to finish. */ for (RMAppAttempt attempt : app.getAppAttempts().values()) { - app.dispatcher.getEventHandler().handle( + // synchronously recover attempt to ensure any incoming external events + // to be processed after the attempt processes the recover event. + attempt.handle( new RMAppAttemptEvent(attempt.getAppAttemptId(), RMAppAttemptEventType.RECOVER)); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index ce9f7ae..8afa9d1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -114,6 +114,7 @@ protected synchronized void serviceInit(Configuration conf) throws Exception { YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); renewerService = createNewThreadPoolService(conf); pendingEventQueue = new LinkedBlockingQueue(); + renewalTimer = new Timer(true); super.serviceInit(conf); } @@ -136,7 +137,6 @@ protected ThreadPoolExecutor createNewThreadPoolService(Configuration conf) { @Override protected void serviceStart() throws Exception { dtCancelThread.start(); - renewalTimer = new Timer(true); if (tokenKeepAliveEnabled) { delayedRemovalThread = new Thread(new DelayedTokenRemovalRunnable(getConfig()), @@ -151,12 +151,12 @@ protected void serviceStart() throws Exception { isServiceStarted = true; serviceStateLock.writeLock().unlock(); while(!pendingEventQueue.isEmpty()) { - processDelegationTokenRewewerEvent(pendingEventQueue.take()); + processDelegationTokenRenewerEvent(pendingEventQueue.take()); } super.serviceStart(); } - private void processDelegationTokenRewewerEvent( + public void processDelegationTokenRenewerEvent( DelegationTokenRenewerEvent evt) { serviceStateLock.readLock().lock(); try { @@ -332,15 +332,13 @@ private void addTokenToList(DelegationTokenToRenew t) { * done else false. * @throws IOException */ - public void addApplication( - ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd, - boolean isApplicationRecovered) { - processDelegationTokenRewewerEvent(new DelegationTokenRenewerAppSubmitEvent( - applicationId, ts, - shouldCancelAtEnd, isApplicationRecovered)); + public void addApplication(ApplicationId applicationId, Credentials ts, + boolean shouldCancelAtEnd) { + processDelegationTokenRenewerEvent(new DelegationTokenRenewerAppSubmitEvent(applicationId, ts, + shouldCancelAtEnd)); } - private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt) + public void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt) throws IOException { ApplicationId applicationId = evt.getApplicationId(); Credentials ts = evt.getCredentials(); @@ -493,7 +491,7 @@ private void removeFailedDelegationToken(DelegationTokenToRenew t) { * @param applicationId completed application */ public void applicationFinished(ApplicationId applicationId) { - processDelegationTokenRewewerEvent(new DelegationTokenRenewerEvent( + processDelegationTokenRenewerEvent(new DelegationTokenRenewerEvent( applicationId, DelegationTokenRenewerEventType.FINISH_APPLICATION)); } @@ -638,9 +636,7 @@ private void handleDTRenewerAppSubmitEvent( // Setup tokens for renewal DelegationTokenRenewer.this.handleAppSubmitEvent(event); rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(event.getApplicationId(), - event.isApplicationRecovered() ? RMAppEventType.RECOVER - : RMAppEventType.START)); + .handle(new RMAppEvent(event.getApplicationId(), RMAppEventType.START)); } catch (Throwable t) { LOG.warn( "Unable to add the application to the delegation token renewer.", @@ -654,20 +650,17 @@ private void handleDTRenewerAppSubmitEvent( } } - class DelegationTokenRenewerAppSubmitEvent extends + public static class DelegationTokenRenewerAppSubmitEvent extends DelegationTokenRenewerEvent { private Credentials credentials; private boolean shouldCancelAtEnd; - private boolean isAppRecovered; public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId, - Credentials credentails, boolean shouldCancelAtEnd, - boolean isApplicationRecovered) { + Credentials credentails, boolean shouldCancelAtEnd) { super(appId, DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION); this.credentials = credentails; this.shouldCancelAtEnd = shouldCancelAtEnd; - this.isAppRecovered = isApplicationRecovered; } public Credentials getCredentials() { @@ -677,10 +670,6 @@ public Credentials getCredentials() { public boolean shouldCancelAtEnd() { return shouldCancelAtEnd; } - - public boolean isApplicationRecovered() { - return isAppRecovered; - } } enum DelegationTokenRenewerEventType { @@ -688,7 +677,7 @@ public boolean isApplicationRecovered() { FINISH_APPLICATION } - class DelegationTokenRenewerEvent extends + static class DelegationTokenRenewerEvent extends AbstractEvent { private ApplicationId appId; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index ad2e17f..31f81eb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -1709,6 +1709,61 @@ public void testDecomissionedNMsMetricsOnRMRestart() throws Exception { rm2.stop(); } + @Test + public void testSynchronouslyRenewDTOnRecovery() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + final MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app0 = rm1.submitApp(200); + final MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1); + + // start RM + MockRM rm2 = new MockRM(conf, memStore) { + @Override + protected ResourceTrackerService createResourceTrackerService() { + return new ResourceTrackerService(this.rmContext, + this.nodesListManager, this.nmLivelinessMonitor, + this.rmContext.getContainerTokenSecretManager(), + this.rmContext.getNMTokenSecretManager()) { + @Override + protected void serviceStart() throws Exception { + // send the container_finished event as soon as the + // ResourceTrackerService is started. + super.serviceStart(); + nm1.setResourceTrackerService(getResourceTrackerService()); + List status = new ArrayList(); + ContainerId amContainer = + ContainerId.newInstance(am0.getApplicationAttemptId(), 1); + status.add(ContainerStatus.newInstance(amContainer, + ContainerState.COMPLETE, "AM container exit", 143)); + nm1.registerNode(status); + } + }; + } + }; + + rm2.start(); + + // wait for the 2nd attempt to be started. + RMApp loadedApp0 = + rm2.getRMContext().getRMApps().get(app0.getApplicationId()); + int timeoutSecs = 0; + while (loadedApp0.getAppAttempts().size() != 2 && timeoutSecs++ < 40) { + Thread.sleep(200); + } + MockAM am1 = MockRM.launchAndRegisterAM(loadedApp0, rm2, nm1); + MockRM.finishApplicationMaster(loadedApp0, rm2, nm1, am1); + } + private void writeToHostsFile(String... hosts) throws IOException { if (!hostFile.exists()) { TEMP_DIR.mkdirs(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index a6ad9b6..5555159 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -353,7 +353,7 @@ public void testDTRenewal () throws Exception { // register the tokens for renewal ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0); - delegationTokenRenewer.addApplication(applicationId_0, ts, true, false); + delegationTokenRenewer.addApplication(applicationId_0, ts, true); waitForEventsToGetProcessed(delegationTokenRenewer); // first 3 initial renewals + 1 real @@ -393,7 +393,7 @@ public void testDTRenewal () throws Exception { ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1); - delegationTokenRenewer.addApplication(applicationId_1, ts, true, false); + delegationTokenRenewer.addApplication(applicationId_1, ts, true); waitForEventsToGetProcessed(delegationTokenRenewer); delegationTokenRenewer.applicationFinished(applicationId_1); waitForEventsToGetProcessed(delegationTokenRenewer); @@ -429,7 +429,7 @@ public void testAppRejectionWithCancelledDelegationToken() throws Exception { // register the tokens for renewal ApplicationId appId = BuilderUtils.newApplicationId(0, 0); - delegationTokenRenewer.addApplication(appId, ts, true, false); + delegationTokenRenewer.addApplication(appId, ts, true); int waitCnt = 20; while (waitCnt-- >0) { if (!eventQueue.isEmpty()) { @@ -473,7 +473,7 @@ public void testDTRenewalWithNoCancel () throws Exception { ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1); - delegationTokenRenewer.addApplication(applicationId_1, ts, false, false); + delegationTokenRenewer.addApplication(applicationId_1, ts, false); waitForEventsToGetProcessed(delegationTokenRenewer); delegationTokenRenewer.applicationFinished(applicationId_1); waitForEventsToGetProcessed(delegationTokenRenewer); @@ -540,7 +540,7 @@ public void testDTKeepAlive1 () throws Exception { // register the tokens for renewal ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0); - localDtr.addApplication(applicationId_0, ts, true, false); + localDtr.addApplication(applicationId_0, ts, true); waitForEventsToGetProcessed(localDtr); if (!eventQueue.isEmpty()){ Event evt = eventQueue.take(); @@ -617,7 +617,7 @@ public void testDTKeepAlive2() throws Exception { // register the tokens for renewal ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0); - localDtr.addApplication(applicationId_0, ts, true, false); + localDtr.addApplication(applicationId_0, ts, true); localDtr.applicationFinished(applicationId_0); waitForEventsToGetProcessed(delegationTokenRenewer); //Send another keep alive. @@ -718,14 +718,14 @@ public Long answer(InvocationOnMock invocation) Thread submitThread = new Thread() { @Override public void run() { - dtr.addApplication(mock(ApplicationId.class), creds1, false, false); + dtr.addApplication(mock(ApplicationId.class), creds1, false); } }; submitThread.start(); // wait till 1st submit blocks, then submit another startBarrier.await(); - dtr.addApplication(mock(ApplicationId.class), creds2, false, false); + dtr.addApplication(mock(ApplicationId.class), creds2, false); // signal 1st to complete endBarrier.await(); submitThread.join();