diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationMasterNotRegisteredException.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationMasterNotRegisteredException.java new file mode 100644 index 0000000..9a75b29 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationMasterNotRegisteredException.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.exceptions; + +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; + +/** + * This exception is thrown when an Application Master tries to unregister by calling + * {@link ApplicationMasterProtocol#finishApplicationMaster(FinishApplicationMasterRequest)} + * API without first registering by calling + * {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)} + * or after an RM restart. The ApplicationMaster is expected to call + * {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)} + * and retry. + */ + +public class ApplicationMasterNotRegisteredException extends YarnException { + + private static final long serialVersionUID = 13498238L; + + public ApplicationMasterNotRegisteredException(Throwable cause) { super(cause);} + + public ApplicationMasterNotRegisteredException(String message) { super(message); } + + public ApplicationMasterNotRegisteredException(String message, Throwable + cause) { + super(message, cause); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java index 6b422b3..6fe6f27 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java @@ -24,10 +24,8 @@ /** * This exception is thrown when an ApplicationMaster asks for resources by - * calling {@link ApplicationMasterProtocol#allocate(AllocateRequest)} or tries - * to unregister by calling - * {@link ApplicationMasterProtocol#finishApplicationMaster(FinishApplicationMasterRequest)} - * API without first registering by calling + * calling {@link ApplicationMasterProtocol#allocate(AllocateRequest)} + * without first registering by calling * {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)} * or if it tries to register more than once. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 94dc474..e60add4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException; @@ -107,12 +108,15 @@ new ConcurrentHashMap(); private final AllocateResponse resync = recordFactory.newRecordInstance(AllocateResponse.class); + private final AllocateResponse shutdown = + recordFactory.newRecordInstance(AllocateResponse.class); private final RMContext rmContext; public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) { super(ApplicationMasterService.class.getName()); this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor(); this.rScheduler = scheduler; + this.shutdown.setAMCommand(AMCommand.AM_SHUTDOWN); this.resync.setAMCommand(AMCommand.AM_RESYNC); this.rmContext = rmContext; } @@ -346,9 +350,9 @@ public FinishApplicationMasterResponse finishApplicationMaster( AuditConstants.UNREGISTER_AM, "", "ApplicationMasterService", message, applicationAttemptId.getApplicationId(), applicationAttemptId); - throw new InvalidApplicationMasterRequestException(message); + throw new ApplicationMasterNotRegisteredException(message); } - + this.amLivelinessMonitor.receivedPing(applicationAttemptId); RMApp rmApp = @@ -409,22 +413,23 @@ public AllocateResponse allocate(AllocateRequest request) AllocateResponseLock lock = responseMap.get(appAttemptId); if (lock == null) { LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId); - return resync; + return shutdown; } synchronized (lock) { AllocateResponse lastResponse = lock.getAllocateResponse(); if (!hasApplicationMasterRegistered(appAttemptId)) { String message = - "Application Master is trying to allocate before registering for: " - + appAttemptId.getApplicationId(); - LOG.error(message); + "Application Master is not registered for known application: " + + appAttemptId.getApplicationId() + + ". Let AM resync."; + LOG.info(message); RMAuditLogger.logFailure( this.rmContext.getRMApps().get(appAttemptId.getApplicationId()) .getUser(), AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message, appAttemptId.getApplicationId(), appAttemptId); - throw new InvalidApplicationMasterRequestException(message); + return resync; } if ((request.getResponseId() + 1) == lastResponse.getResponseId()) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 3f9ef64..2526150 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -783,7 +783,8 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { } // Notify scheduler about the app on recovery - new AddApplicationToSchedulerTransition().transition(app, event); + app.handler.handle(new AppAddedSchedulerEvent(app.applicationId, + app.submissionContext.getQueue(), app.user)); // recover attempts app.recoverAppAttempts(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 5e71c93..4ad7572 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -884,8 +884,12 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, } else { // Add the current attempt to the scheduler. if (appAttempt.rmContext.isWorkPreservingRecoveryEnabled()) { + // Need to register an app attempt before AM can register + appAttempt.masterService + .registerAppAttempt(appAttempt.applicationAttemptId); + appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent( - appAttempt.getAppAttemptId(), false)); + appAttempt.getAppAttemptId(), false, false)); } /* diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 5de407d..171487a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -557,7 +557,8 @@ private synchronized void addApplication(ApplicationId applicationId, private synchronized void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, - boolean transferStateFromPreviousAttempt) { + boolean transferStateFromPreviousAttempt, + boolean shouldNotifyAttemptAdded) { SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); CSQueue queue = (CSQueue) application.getQueue(); @@ -575,9 +576,15 @@ private synchronized void addApplicationAttempt( LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user " + application.getUser() + " in queue " + queue.getQueueName()); - rmContext.getDispatcher().getEventHandler() .handle( - new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); + if (shouldNotifyAttemptAdded) { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(applicationAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping notifying ATTEMPT_ADDED"); + } + } } private synchronized void doneApplication(ApplicationId applicationId, @@ -911,7 +918,8 @@ public void handle(SchedulerEvent event) { AppAttemptAddedSchedulerEvent appAttemptAddedEvent = (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), - appAttemptAddedEvent.getTransferStateFromPreviousAttempt()); + appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), + appAttemptAddedEvent.getShouldNotifyAttemptAdded()); } break; case APP_ATTEMPT_REMOVED: diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java index d31010d..64d308a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java @@ -24,13 +24,22 @@ private final ApplicationAttemptId applicationAttemptId; private final boolean transferStateFromPreviousAttempt; + private final boolean shouldNotifyAttemptAdded; public AppAttemptAddedSchedulerEvent( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt) { + this(applicationAttemptId, transferStateFromPreviousAttempt, true); + } + + public AppAttemptAddedSchedulerEvent( + ApplicationAttemptId applicationAttemptId, + boolean transferStateFromPreviousAttempt, + boolean shouldNotifyAttemptAdded) { super(SchedulerEventType.APP_ATTEMPT_ADDED); this.applicationAttemptId = applicationAttemptId; this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt; + this.shouldNotifyAttemptAdded = shouldNotifyAttemptAdded; } public ApplicationAttemptId getApplicationAttemptId() { @@ -40,4 +49,8 @@ public ApplicationAttemptId getApplicationAttemptId() { public boolean getTransferStateFromPreviousAttempt() { return transferStateFromPreviousAttempt; } + + public boolean getShouldNotifyAttemptAdded() { + return shouldNotifyAttemptAdded; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index ea53165..cafd1f6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -597,7 +597,8 @@ protected synchronized void addApplication(ApplicationId applicationId, */ protected synchronized void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, - boolean transferStateFromPreviousAttempt) { + boolean transferStateFromPreviousAttempt, + boolean shouldNotifyAttemptAdded) { SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); String user = application.getUser(); @@ -625,9 +626,16 @@ protected synchronized void addApplicationAttempt( LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user: " + user); - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); + + if (shouldNotifyAttemptAdded) { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(applicationAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping notifying ATTEMPT_ADDED"); + } + } } /** @@ -1131,7 +1139,8 @@ public void handle(SchedulerEvent event) { AppAttemptAddedSchedulerEvent appAttemptAddedEvent = (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), - appAttemptAddedEvent.getTransferStateFromPreviousAttempt()); + appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), + appAttemptAddedEvent.getShouldNotifyAttemptAdded()); break; case APP_ATTEMPT_REMOVED: if (!(event instanceof AppAttemptRemovedSchedulerEvent)) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 4681516..b017db7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -370,7 +370,8 @@ public synchronized void addApplication(ApplicationId applicationId, @VisibleForTesting public synchronized void addApplicationAttempt(ApplicationAttemptId appAttemptId, - boolean transferStateFromPreviousAttempt) { + boolean transferStateFromPreviousAttempt, + boolean shouldNotifyAttemptAdded) { SchedulerApplication application = applications.get(appAttemptId.getApplicationId()); String user = application.getUser(); @@ -388,9 +389,15 @@ public synchronized void addApplication(ApplicationId applicationId, metrics.submitAppAttempt(user); LOG.info("Added Application Attempt " + appAttemptId + " to scheduler from user " + application.getUser()); - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(appAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); + if (shouldNotifyAttemptAdded) { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(appAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping notifying ATTEMPT_ADDED"); + } + } } private synchronized void doneApplication(ApplicationId applicationId, @@ -780,7 +787,8 @@ public void handle(SchedulerEvent event) { AppAttemptAddedSchedulerEvent appAttemptAddedEvent = (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), - appAttemptAddedEvent.getTransferStateFromPreviousAttempt()); + appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), + appAttemptAddedEvent.getShouldNotifyAttemptAdded()); } break; case APP_ATTEMPT_REMOVED: diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index 64e5cc9..36182f5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -194,28 +195,17 @@ public void testallocateBeforeAMRegistration() throws Exception { // request for containers int request = 2; - try { - AllocateResponse ar = - am.allocate("h1", 1000, request, new ArrayList()); - } catch (Exception e) { - Assert.assertEquals("Application Master is trying to allocate before " - + "registering for: " + attempt.getAppAttemptId().getApplicationId(), - e.getMessage()); - thrown = true; - } + AllocateResponse ar = + am.allocate("h1", 1000, request, new ArrayList()); + Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC); + // kick the scheduler nm1.nodeHeartbeat(true); - try { - AllocateResponse amrs = - am.allocate(new ArrayList(), - new ArrayList()); - } catch (Exception e) { - Assert.assertEquals("Application Master is trying to allocate before " - + "registering for: " + attempt.getAppAttemptId().getApplicationId(), - e.getMessage()); - thrown = true; - } - Assert.assertTrue(thrown); + AllocateResponse amrs = + am.allocate(new ArrayList(), + new ArrayList()); + Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC); + am.registerAppAttempt(); thrown = false; try { @@ -228,5 +218,17 @@ public void testallocateBeforeAMRegistration() throws Exception { thrown = true; } Assert.assertTrue(thrown); + + // Simulate an AM that was disconnected and app attempt was removed + // (responseMap does not contain attemptid) + am.unregisterAppAttempt(); + nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, + ContainerState.COMPLETE); + am.waitForState(RMAppAttemptState.FINISHED); + + AllocateResponse amrs2 = + am.allocate(new ArrayList(), + new ArrayList()); + Assert.assertTrue(amrs2.getAMCommand() == AMCommand.AM_SHUTDOWN); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index afe28aa..671851c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -18,60 +18,33 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import com.google.common.collect.Maps; import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; + import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.event.InlineDispatcher; -import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.*; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; - -import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; + import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.ArgumentCaptor; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import java.util.concurrent.ConcurrentMap; import static java.lang.Thread.sleep; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyList; -import static org.mockito.Mockito.*; public class TestApplicationMasterService { private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class); @@ -270,13 +243,17 @@ public void testFinishApplicationMasterBeforeRegistering() throws Exception { } Assert.assertNotNull(cause); Assert - .assertTrue(cause instanceof InvalidApplicationMasterRequestException); + .assertTrue(cause instanceof ApplicationMasterNotRegisteredException); Assert.assertNotNull(cause.getMessage()); Assert .assertTrue(cause .getMessage() .contains( "Application Master is trying to unregister before registering for:")); + + am1.registerAppAttempt(); + + am1.unregisterAppAttempt(req, false); } finally { if (rm != null) { rm.stop(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java index b7b77a7..aa7f631 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java @@ -238,7 +238,7 @@ public void testNodeUpdateBeforeAppAttemptInit() throws Exception { } ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 1); - scheduler.addApplicationAttempt(attId, false); + scheduler.addApplicationAttempt(attId, false, true); rm.stop(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 8eed4e6..213b28d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -292,7 +292,7 @@ public void testRMRestart() throws Exception { AllocateResponse allocResponse = am1.allocate( new ArrayList(), new ArrayList()); - Assert.assertTrue(allocResponse.getAMCommand() == AMCommand.AM_RESYNC); + Assert.assertEquals(AMCommand.AM_SHUTDOWN, allocResponse.getAMCommand()); // NM should be rebooted on heartbeat, even first heartbeat for nm2 NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); @@ -1647,7 +1647,7 @@ public void testQueueMetricsOnRMRestart() throws Exception { rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); MockAM am1 = rm1.sendAMLaunched(attempt1.getAppAttemptId()); am1.registerAppAttempt(); - am1.allocate("127.0.0.1" , 1000, 1, new ArrayList()); + am1.allocate("127.0.0.1" , 1000, 1, new ArrayList()); nm1.nodeHeartbeat(true); List conts = am1.allocate(new ArrayList(), new ArrayList()).getAllocatedContainers(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 6dd2992..89342af 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -535,6 +535,36 @@ public void testContainersNotRecoveredForCompletedApps() throws Exception { assertNull(scheduler.getRMContainer(completedContainer.getContainerId())); } + @Test (timeout = 600000) + public void testAppReregisterOnRMWorkPreservingRestart() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // start RM + rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = MockRM.launchAM(app0, rm1, nm1); + + // start new RM + rm2 = new MockRM(conf, memStore); + rm2.start(); + rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); + rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED); + + am0.setAMRMProtocol(rm2.getApplicationMasterService()); + am0.registerAppAttempt(false); + + rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING); + rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING); + } + private void asserteMetrics(QueueMetrics qm, int appsSubmitted, int appsPending, int appsRunning, int appsCompleted, int allocatedContainers, int availableMB, int availableVirtualCores, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index fb864a2..dd1e4bb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -146,7 +146,7 @@ protected ApplicationAttemptId createSchedulingRequest( // This conditional is for testAclSubmitApplication where app is rejected // and no app is added. if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) { - scheduler.addApplicationAttempt(id, false); + scheduler.addApplicationAttempt(id, false, true); } List ask = new ArrayList(); ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index dd42aa0..73bed39 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -787,13 +787,13 @@ public void testQueueDemandCalculation() throws Exception { ApplicationAttemptId id11 = createAppAttemptId(1, 1); scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1"); - scheduler.addApplicationAttempt(id11, false); + scheduler.addApplicationAttempt(id11, false, true); ApplicationAttemptId id21 = createAppAttemptId(2, 1); scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1"); - scheduler.addApplicationAttempt(id21, false); + scheduler.addApplicationAttempt(id21, false, true); ApplicationAttemptId id22 = createAppAttemptId(2, 2); scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1"); - scheduler.addApplicationAttempt(id22, false); + scheduler.addApplicationAttempt(id22, false, true); int minReqSize = FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB; @@ -1555,7 +1555,7 @@ public void testMultipleNodesSingleRackRequest() throws Exception { ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); scheduler.addApplication(appId.getApplicationId(), "queue1", "user1"); - scheduler.addApplicationAttempt(appId, false); + scheduler.addApplicationAttempt(appId, false, true); // 1 request with 2 nodes on the same rack. another request with 1 node on // a different rack @@ -2679,7 +2679,7 @@ public void testContinuousScheduling() throws Exception { ApplicationAttemptId appAttemptId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11"); - fs.addApplicationAttempt(appAttemptId, false); + fs.addApplicationAttempt(appAttemptId, false, true); List ask = new ArrayList(); ResourceRequest request = createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);